作者:dcguo,腾讯 CSIG 电子签开放平台中心
go 原生/扩展库提倡的原则 不要通过共享内存进行通信;相反,通过通信来共享内存。 Goroutine goroutine 并发模型 调度器主要结构 主要调度器结构是 M,P,G
P 的数量由环境变量中的 GOMAXPROCS 决定,通常来说和核心数对应。 映射关系用户空间线程和内核空间线程映射关系有如下三种:
调度图关系如图,灰色的 G 则是暂时还未运行的,处于就绪态,等待被调度,这个队列被 P 维护 注: 简单调度图如上,有关于 P 再多个 M 中切换,公共 goroutine 队列,M 从线程缓存中创建等步骤没有体现,复杂过程可以参考文章简单了解 goroutine 如何实现。 goroutine 使用
go list.Sort()
channelchannel 特性 创建 // 创建 channela := make(chan int)b := make(chan int, 10)// 单向 channelc := make(chan<- int)d := make(<-chan int) 存入/读取/关闭tip:
channel 使用/基础
ci := make(chan int)cj := make(chan int, 0)cs := make(chan *os.File, 100)
func Server(queue chan *Request) {for req := range queue {sem <- 1go func() {process(req)<- sem}()}}
func Serve(queue chan *Request) {for req := range queue {req := reqsem <- 1go func() {process(req)<-sem}()}} channel 使用/技巧等待一个事件,也可以通过 close 一个 channel 就足够了。
阻塞程序开源项目【是一个支持集群的 im 及实时推送服务】里面的基准测试的案例 取最快结果func main() { ret := make(chan string, 3) for i := 0; i < cap(ret); i++ { go call(ret) } fmt.Println(<-ret)}func call(ret chan<- string) { // do something // ... ret <- 'result'} 协同多个 goroutines注: 协同多个 goroutines 方案很多,这里只展示 channel 的一种。
搭配 select 操作for { select { case a := <- testChanA: // todo a case b, ok := testChanB: // todo b, 通过 ok 判断 tesChanB 的关闭情况 default: // 默认分支 }} main go routinue 确认 worker goroutinue 真正退出的方式
关闭的 channel 不会被阻塞testChan := make(chan bool)close(testChan)zeroValue := <- testChanfmt.Println(zeroValue) // falsetestChan <- true // panic: send on closed channel 注: 如果是 buffered channel, 即使被 close, 也可以读到之前存入的值,读取完毕后开始读零值,写入则会触发 panic nil channel 读取和存入都不会阻塞,close 会 panic略 range 遍历 channel
例: 唯一 idfunc newUniqueIdService() <-chan string { id := make(chan string) go func() { var counter int64 = 0 for { id <- fmt.Sprintf('%x', counter) counter += 1 } }() return id}func newUniqueIdServerMain() { id := newUniqueIdService() for i := 0; i < 10; i++ { fmt.Println(<- id) }} 带缓冲队列构造略 超时 timeout 和心跳 heart beat
demo 开源 im/goim 项目中的应用 2.心跳 done := make(chan bool)defer func() {close(done)}()ticker := time.NewTicker(10 * time.Second)go func() {for {select {case <-done:ticker.Stop()returncase <-ticker.C:message.Touch()}}}()} 多个 goroutine 同步响应
利用 channel 阻塞的特性和带缓冲的 channel 来实现控制并发数量func channel() { count := 10 // 最大并发 sum := 100 // 总数 c := make(chan struct{}, count) sc := make(chan struct{}, sum) defer close(c) defer close(sc) for i:=0; i<sum; i++ { c <- struct{} go func(j int) { fmt.Println(j) <- c // 执行完毕,释放资源 sc <- struct {}{} // 记录到执行总数 } } for i:=sum; i>0; i++ { <- sc }} go 并发编程(基础库)
Mutex/RWMutex
Mutex demo
结果: 可以看到整个执行持续了 3 s 多,内部多个协程已经被 “锁” 住了。 RWMutex demo 注意: 这东西可以并发读,不可以并发读写/并发写写,不过现在即便场景是读多写少也很少用到这,一般集群环境都得分布式锁了。 package mainimport ( 'fmt' 'sync' 'time')var m *sync.RWMutexfunc init() { m = new(sync.RWMutex)}func main() { go read() go read() go write() time.Sleep(time.Second * 3)}func read() { m.RLock() fmt.Println('startR') time.Sleep(time.Second) fmt.Println('endR') m.RUnlock()}func write() { m.Lock() fmt.Println('startW') time.Sleep(time.Second) fmt.Println('endW') m.Unlock()} 输出: Atomic
demo:增
结果: WaitGroup/ErrGroup
注意
demo: errGroup package mainimport ( 'golang.org/x/sync/errgroup' 'log' 'net/http')func main() { var g errgroup.Group var urls = []string{ 'https://github.com/', 'errUrl', } for _, url := range urls { url := url g.Go(func() error { resp, err := http.Get(url) if err == nil { _ = resp.Body.Close() } return err }) } err := g.Wait() if err != nil { log.Fatal('getErr', err) return }} 结果: once
demo:
结果: Context
对他的说明文章太多了,详细可以跳转看这篇 一文理解 golang context 这边列一个遇到得问题:
并行
我们可以再每个 CPU 上进行循环无关的迭代计算,我们仅需要创建完所有的 goroutine 后,从 channel 中读取结束信号进行计数即可。 并发编程/工作流方案扩展
singlelFlight(go 官方扩展同步包)
demo package mainimport ( 'golang.org/x/sync/singleflight' 'log' 'math/rand' 'sync' 'time')var ( g singleflight.Group)const ( funcKey = 'key' times = 5 randomNum = 100)func init() { rand.Seed(time.Now().UnixNano())}func main() { var wg sync.WaitGroup wg.Add(times) for i := 0; i < times; i++ { go func() { defer wg.Done() num, err := run(funcKey) if err != nil { log.Fatal(err) return } log.Println(num) }() } wg.Wait()}func run(key string) (num int, err error) { v, err, isShare := g.Do(key, func() (interface{}, error) { time.Sleep(time.Second * 5) num = rand.Intn(randomNum) //[0,100) return num, nil }) if err != nil { log.Fatal(err) return 0, err } data := v.(int) log.Println(isShare) return data, nil} 连续执行 3 次,返回结果如下,全部取了共享得结果: 但是注释掉 time.Sleep(time.Second * 5) 再尝试一次看看。 ![]() 这次全部取得真实值 实践: 伙伴部门高峰期可以减少 20% 的 Redis 调用, 大大减少了 Redis 的负载 实践开发案例
批量校验
批量校验接口限频单账户最高 100qps/s,整个系统多个校验场景公用一个账户限频需要限制批量校验最高为 50~80 qps/s(需要预留令牌供其他场景使用,否则频繁调用批量接口时候其他场景均会失败限频)。
1.使用 go routine 来并发进行三要素校验,因为 go routinue,所以每次开启 50 ~ 80 go routine 同时进行单次三要素校验; 2.每轮校验耗时 1s,如果所有 routinue 校验后与校验开始时间间隔不满一秒,则需要主动程序睡眠至 1s,然后开始下轮校验; 3.因为只是校验场景,如果某次校验失败,最容易的原因其实是校验方异常,或者被其他校验场景再当前 1s 内消耗过多令牌;那么整个批量接口返回 err,运营同学重新发起就好。
代码需要进行的优化点:
1.sleep 1s 这个操作可以从调用前开始计时,调用完成后不满 1s 补充至 1s,而不是每次最长调用时间 elapsedTime + 1s; 2.通道中获取的三要素校验结果顺序和入参数据数组顺序不对应,这里通过两种方案: 3.分组调用 ![]()
历史数据批量标签
func (s ServiceOnceJob) CompensatingHistoricalLabel(ctx context.Context,request *api.CompensatingHistoricalLabelRequest) (response *api.CompensatingHistoricalLabelResponse, err error) {if request.Key != interfaceKey {return nil, transform.Simple('err')}ctx, cancelFunc := context.WithCancel(ctx)var (wg = new(sync.WaitGroup)userRegisterDb = new(datareportdb.DataReportUserRegisteredRecords)startNum = int64(0))wg.Add(1)countHistory, err := userRegisterDb.GetUserRegisteredCountForHistory(ctx, historyStartTime, historyEndTime)if err != nil {return nil, err}div := decimal.NewFromFloat(float64(countHistory)).Div(decimal.NewFromFloat(float64(theNumberOfConcurrent)))f, _ := div.Float64()num := int64(math.Ceil(f))for i := 0; i < theNumberOfConcurrent; i++ {go func(startNum int64) {defer wg.Done()for {select {case <- ctx.Done():returndefault:userDataArr, err := userRegisterDb.GetUserRegisteredDataHistory(ctx, startNum, num)if err != nil {cancelFunc()}for _, userData := range userDataArr {if err := analyseUserAction(userData); err != nil {cancelFunc()}}}}}(startNum)startNum = startNum + num}wg.Wait()return response, nil} 批量发起/批量签署实现思路和上面其实差不多,都是需要支持批量的特性,基本上现在业务中统一使用多协程处理。 思考golang 协程很牛 x,协程的数目最大到底多大合适,有什么衡量指标么?
基本上可以这样理解这件事
使用锁时候正确释放锁的方式
goroutine 泄露预防与排查
goroutine 的退出其实只有以下几种方式可以做到
大多数引起 goroutine 泄露的原因基本上都是如下情况
杜绝:
排查:
案例:
输出: ![]() pprof:
![]() 复杂情况也可以用其他的可视化工具:
![]() 父协程捕获子协程 panic
父协程捕获子协程 panic 有锁的地方就去用 channel 优化
分享一个很不错的优化 demo: 场景:
分析:
问题:
解决
增加锁机制,解决针对链接池的并发问题发送消息也需要去加锁因为要防止出现 panic: concurrent write to websocket connection
假设网络延时,用户新增时候还有消息再发送中,新加入的用户就无法获得锁了,后面其他的相关操作都会被阻塞导致问题。 使用 channel 优化:
2.使用通道
3.通道消息方法,代码来自于开源项目 简单聊天架构演变: // 处理所有管道任务func (room *Room) ProcessTask() {log := zap.S()log.Info('启动处理任务')for {select {case c := <-room.register:log.Info('当前有客户端进行注册')room.clientsPool[c] = truecase c := <-room.unregister:log.Info('当前有客户端离开')if room.clientsPool[c] {close(c.send)delete(room.clientsPool, c)}case m := <-room.send:for c := range room.clientsPool {select {case c.send <- m:default:break}}}}} 结果: 成功使用 channel 替换了锁。 参考
|
|
来自: 菌心说 > 《编程+、计算机、信息技术》