Go协程加管道实现异步批量消费调度任务
周末了,这周遇到个问题当时没想明白,周末整理下
题目有点绕口 在现实的项目中这么搞的也不常见,里面牵涉多个知识点,整理下就当学习了。
程序需求:
1:接收任务,从异步消息队列里面监控接收最新的任务消息
2:处理任务,每个任务耗时可能不定
我们常规的做法就是开启一个长监听串行化来一个执行一个,实在不行就多开几个,这种呢人工干预比较重,有时候还盯着不是太好。
程序方案:
1:异步接受消息 ,开启一个协程接受消息 这个不用多开接收消息不会成为瓶颈
2:异步处理消息,开启协程异步处理对应的消息,这里有点要注意是一个消息就开一个处理协程 还是多个消息开启,是值得思考的。
3:批量处理,多个消息开启一个处理协程,防止开启过多的协程,
4:超时处理,如果长时间没有达到批量处理的数量限制,那么也要及时处理
5:限制过多的协程,这个其实没有什么必要,因为go所谓百万协程性能但是既然搞了这个例子 那就完善下吧
代码实现:
package main import ( "errors" "fmt" "math/rand" "runtime" "strconv" "time" ) var msgChanLimit = 20 //消息管道大小限制 var handleId int //协程任务自定义id 用来区分 查看对应的任务批次 每处理一次 ++ /* periodType 消息产生时间 1 1秒一个信息会走到超时处理模块; 2 1毫秒一个信息 会一直走正常处理模块; 3 随机0秒到 1秒 模拟都会触发的情况 */ var periodType = 3 var batchNum = 6 // 最多堆积多少未处理信息 进行一批次处理 var batchTime = 3 // 在未满消息 最长多久 进行一批次处理 var maxGoroutines = 10 //最大的协程数量 这个其实感觉意义不大 但是还是搞一个 边界预防吧 //接受消息 func getMsg(msgChan chan string) { msgId := 0 for { //循环接收消息 msgId++ msg := "Msg id:" + strconv.Itoa(msgId) + "; AddTime:" + time.Now().Format("2006-01-02 15:04:05") msgChan <- msg switch periodType { case 1: time.Sleep(1 * time.Second) // 秒 case 2: time.Sleep(1 * time.Millisecond) // 毫秒 case 3: time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) // 随机 } } } // 处理消息 func handleMsg(model int, handleId int, msgSet []string, guard chan struct{}) { for i, v := range msgSet { fmt.Println("HandleId:", handleId, ";Model", model, ">>> Idx:"+strconv.Itoa(i)+";Content:"+v) time.Sleep(1500 * time.Millisecond) //模拟具体处理消息是有耗时的 间隔越大 那么多个后台会有越多的挂起的处理 } <-guard //释放一个限量 } //无阻塞去接受消息 func unBlockRead(ch chan string) (msg string, err error) { select { case msg = <-ch: return msg, nil case <-time.After(time.Microsecond): return "", errors.New("nil") } } func main() { guard := make(chan struct{}, maxGoroutines) //守护协程数量限制 msgChan := make(chan string, msgChanLimit) //接受消息channel大小 go getMsg(msgChan) // 开始接收消息 msgSet := make([]string, 0) // 临时存放接收到的消息集合 step := 0 //秒计数器 对应秒 for { //主逻辑处理 开始处理 if msg, err := unBlockRead(msgChan); err == nil { //接收到消息 msgSet = append(msgSet, msg) if len(msgSet) == batchNum { //达到处理数量 handleId++ guard <- struct{}{} go handleMsg(1, handleId, msgSet, guard) // 处理当前的msgSet msgSet = nil //重置 step = 0 } } else { if step > batchTime && len(msgSet) > 0 { // 超时并且不为空 handleId++ guard <- struct{}{} go handleMsg(2, handleId, msgSet, guard) msgSet = nil //重置 step = 0 } else { step++ time.Sleep(1 * time.Second) //休息一秒 step++ } } } // 挂起主进程 防止退出 for { runtime.GC() } }
整体的代码就是这样,具体的可以看注释了(癖好这东西 the more the more)
运行效果
就这样吧,里面牵涉多个小点,比较有意思