[TOC]
概述
平时经常需要写工具处理给定的一堆数据,最简单的实现是一个 for 循环串行执行,为了提高效率实现了一个通用的分发者,生产任务给多个消费者并行处理。 该模式下,有一个任务生产者,同时有多个任务消费者。 在本地,当前代码实现的是通过 Golang 的 channel 进行任务传递。 同样可以实现通过网络调用,将任务分发到不同机器上进行任务处理,真正实现负载的均衡。
直接上代码~
dispatcher 实现
1package dispatcher2
3import (4 "context"5 "log"6 "sync"7)8
9// Dispatcher 任务分发器10// 使用了 Golang 的泛型,需要提前定义好需要处理的任务类型,可以是基础类型,也可以是自定义结构体等11type Dispatcher[T any] struct {12 handler func(ctx context.Context, ch <-chan T) error13}14
15// NewDispatcher 新建任务分发器42 collapsed lines
16// 在新建任务分发器时,需要传入对应的处理方法,通过 channel 上的 T 的类型将 Dispatcher 实例化17func NewDispatcher[T any](handler func(ctx context.Context, ch <-chan T) error) *Dispatcher[T] {18 return &Dispatcher[T]{19 handler: handler,20 }21}22
23// Exec 执行24// items 为需要被处理的一批数据25// parallelNum 为并行启动的 handler 协程数目26func (d *Dispatcher[T]) Exec(ctx context.Context, items []T, parallelNum int) error {27 if parallelNum == 0 {28 parallelNum = 129 }30
31 ch := make(chan T)32 // 对于要处理的任务进行分发,在分发完毕后,会关闭 channel33 go func(ctx context.Context) {34 defer close(ch)35 for idx, item := range items {36 ch <- item37 if idx%1000 == 0 {38 log.Printf("task idx[%d] dispatched", idx)39 }40 }41 }(ctx)42
43 // 基于传入的并发数,创建对应数目协程执行处理函数44 // 在 handler 中,需要持续从 channel 取出单一任务进行处理,当 channel 关闭时,会对应退出45 var wg sync.WaitGroup46 for i := 0; i < parallelNum; i++ {47 wg.Add(1)48 go func() {49 defer wg.Done()50 if err := d.handler(ctx, ch); err != nil {51 return52 }53 }()54 }55 wg.Wait()56 return nil57}dispatcher 使用
1func handler(ctx context.Context, ch <-chan string) error {2 for s := range ch {3 fmt.Printf("get string[%s]\n", s)4 }5 return nil6}7
8func main() {9 ctx := context.TODO()10 // 传入处理函数,也可以传入结构体的方法11 distpatch := dispatcher.NewDispatcher(handler)12 err := distpatch.Exec(ctx, []string{"1", "2", "3"}, 3)13 if err != nil {14 log.Fatalf("exec err[%+v]", err)15 }2 collapsed lines
16 log.Println("exec succeed")17}进一步优化
上述的实现还有很多可以优化的地方,比如:
- 直接暴露了通过 channel 进行任务传递,未来想变更传递方式不方便
- 需要在 Exec 调用时就传入整批数据,不能边生产边消费
- 返回 error,但实际没有进行错误处理,Exec 始终返回 nil
- 没有超时控制,如果任务执行时间过长,将会持续阻塞
不暴露 channel
上述实现中,handler 对外暴露了入参是 channel,这也导致了使用者需要在 handler 中 for range 来获取数据。 暴露了底层细节,同时导致基本所有的使用者均需要生产这一行代码。
虽然这样的一点好处是,使用者可以知道在什么时候停止,以及一次处理多少个。 然而,这并不构成强有力的理由。 控制停止,主要有两个因素:
- 因失败停止 - 通过返回 error 可以实现
- 因超时停止 - 在后续小节将会介绍
而一次处理多个,实际上可以将 T 简单地变为数组实现,在生产时确定好需要在一次 handle 中处理的对象即可。 因此在 handler 的入参中将 channel 去掉,是顺理成章的。
1// Dispatcher 任务分发器2type Dispatcher[T any] struct {3 handler func(ctx context.Context, item T) error // 让处理函数专注于当前的元素4}5
6// NewDispatcher 新建任务分发器7func NewDispatcher[T any](handler func(ctx context.Context, item T) error) *Dispatcher[T] {8 return &Dispatcher[T]{9 handler: handler,10 }11}12
13// Exec 执行14func (d *Dispatcher[T]) Exec(ctx context.Context, items []T, parallelNum int) error {15 // ....相关实现同上16 collapsed lines
16
17 var wg sync.WaitGroup18 for i := 0; i < parallelNum; i++ {19 wg.Add(1)20 go func() {21 defer wg.Done()22 for item := range ch { // 让 dispatcher 内部闭环写入和读取23 if err := d.handler(ctx, item); err != nil {24 return25 }26 }27 }()28 }29 wg.Wait()30 return nil31}边生产边消费
当前 Exec 的实现中,将生产部分放入了协程中,而消费部分通过 WaitGroup 阻塞住,直到生产完所有的任务调用 close(ch) 使得消费者协程能够退出死循环。 因为 WaitGroup 的阻塞状态,只有在生产完成前才能维持住,所以导致需要一次传入所有的任务。 当然,实际上可以每次新任务到来时都调用 Exec,但是每次进行相关资源的创建和销毁,有一定成本。
为了实现边生产边消费,核心就是要使得在生产过程中,始终不会关闭 channel,以维持消费者协程的驻留。 因此,可以如下实现:
1type Dispatcher[T any] struct {2 ch chan T // 支持写入和读取3 end chan struct{} // 用于同步生产者和消费者的退出,避免生产结束,还没消费完就结束4 handler func(ctx context.Context, item T) error // 让处理函数专注于当前的元素5}6
7// NewDispatcher 新建任务分发器8func NewDispatcher[T any](handler func(ctx context.Context, item T) error) *Dispatcher[T] {9 return &Dispatcher[T]{10 ch: make(chan T),11 end: make(chan struct{}),12 handler: handler,13 }14}15
40 collapsed lines
16// Serve 启用服务17func (d *Dispatcher[T]) Serve(ctx context.Context, parallelNum int) error {18 if parallelNum == 0 {19 parallelNum = 120 }21
22 go func() {23 defer close(d.end) // 说明消费者消费完毕,Close 无需阻塞24 var wg sync.WaitGroup25 for i := 0; i < parallelNum; i++ {26 wg.Add(1)27 go func() {28 defer wg.Done()29 for item := range d.ch {30 if err := d.handler(ctx, item); err != nil {31 return32 }33 }34 }()35 }36 wg.Wait()37 }()38 return nil39}40
41// Handle 处理42func (d *Dispatcher[T]) Handle(item T) {43 if d.ch == nil {44 return45 }46 d.ch <- item47}48
49func (d *Dispatcher[T]) Close() {50 if d.ch == nil {51 return52 }53 close(d.ch)54 <-d.end // 阻塞住,等待消费者消费完已有任务55}使用时,先调用 Serve,让消费者等待任务到来。再调用 defer Close(),保证资源在结束时会被回收。 最后,在每次新任务到来时调用 Handle 传递任务。
1func main() {2 ctx := context.TODO()3 // 传入处理函数,也可以传入结构体的方法4 distpatch := dispatcher.NewDispatcher(handler)5 err := distpatch.Serve(ctx, 3)6 if err != nil {7 log.Fatalf("serve err[%+v]", err)8 }9 defer distpatch.Close()10 // 此处可以通过读文件等方式,持续读取需要处理的任务,并逐一执行11 for _, item := range []string{"1","2","3"} {12 distpatch.Handle(item)13 }14 log.Println("exec succeed")15}【注意】上述的实现仅仅考虑了如何可以持续生产和消费,但实际使用上 Handle 感知不到每次执行的成功与失败。 如果持续失败,实际上 Server 部分的消费者都会退出,Handle 调用最终将阻塞。
因此,在所有消费者都退出时,也需要关闭 channel,此时为了避免多次关闭,我们依赖 sync.Once 实现只关闭一次。 同时,由于消费者现在也会执行关闭 channel 操作,生产者写入 channel 时可能 panic,这里通过 recover 实现对异常的恢复。
1type Dispatcher[T any] struct {2 ch chan T3 handler func(ctx context.Context, item T) error4 once sync.Once // 让某操作只执行一次5}6
7// NewDispatcher 新建任务分发器8func NewDispatcher[T any](handler func(ctx context.Context, item T) error) *Dispatcher[T] {9 return &Dispatcher[T]{10 ch: make(chan T),11 handler: handler,12 once: sync.Once{},13 }14}15
49 collapsed lines
16// Serve 启用服务17func (d *Dispatcher[T]) Serve(ctx context.Context, parallelNum int) error {18 if parallelNum == 0 {19 parallelNum = 120 }21
22 go func() {23 defer func() {24 close(d.end) // 说明消费者消费完毕,Close 无需阻塞25 d.Close() // 所有消费者退出时,关闭 channel26 }27 var wg sync.WaitGroup28 for i := 0; i < parallelNum; i++ {29 wg.Add(1)30 go func() {31 defer wg.Done()32 for item := range d.ch {33 if err := d.handler(ctx, item); err != nil {34 return35 }36 }37 }()38 }39 wg.Wait()40 }()41 return nil42}43
44// Handle 处理45func (d *Dispatcher[T]) Handle(item T) {46 defer func() {47 recover() // 捕获向已关闭 channel 写入的 panic48 }()49 if d.ch == nil {50 return51 }52 d.ch <- item53}54
55// Close 进行资源关闭56func (d *Dispatcher[T]) Close() {57 if d.ch == nil {58 return59 }60 d.once.Do(func() { // 统一只执行一次关闭61 close(d.ch)62 })63 <-d.end64}感知每次操作异常
仅从感知的角度来说,有两种可能的要求:一种是生产者感知异常,另一种是消费者感知异常。 上述代码中,因为通过 channel 把生产和消费隔离开,导致消费者只能单向接收任务,而缺少返回任务运行结果的手段。 因此,如果生产者想感知异常,需要补全通信的链路。同样可以使用 channel,构造出任务ID + 异常处理结果的结构体,对这个 channel 消费以处理异常。
更常见的诉求,实际上是发生异常时需要进行异常处理,即消费者在执行主链路的任务失败后,感知异常,并进行相关补偿操作。 最常见的是重试,并且如果多次重试失败,对问题进行记录。 这种情况,主要考虑是在定义 handler 时在内部处理。 另一种方法是,允许另外传入异常处理 handler,在 dispatcher 内自动进行调度。
1type Dispatcher[T any] struct {2 ch chan T // 支持写入和读取3 handler func(ctx context.Context, item T) error // 让处理函数专注于当前的元素4 errHandler func(ctx context.Context, item T, err error)5}6
7// NewDispatcher 新建任务分发器8func NewDispatcher[T any](handler func(ctx context.Context, item T) error,9 errHandler func(ctx context.Context, item T, err error)) *Dispatcher[T] {10 return &Dispatcher[T]{11 ch: make(chan T),12 handler: handler,13 errHandler: errHandler,14 }15}28 collapsed lines
16
17// Serve 启用服务18func (d *Dispatcher[T]) Serve(ctx context.Context, parallelNum int) error {19 if parallelNum == 0 {20 parallelNum = 121 }22
23 go func() {24 defer func() {25 close(d.end) // 说明消费者消费完毕,Close 无需阻塞26 d.Close() // 所有消费者退出时,关闭 channel27 }28 var wg sync.WaitGroup29 for i := 0; i < parallelNum; i++ {30 wg.Add(1)31 go func() {32 defer wg.Done()33 for item := range d.ch {34 if err := d.handler(ctx, item); err != nil {35 d.errHandler(ctx, item, err)36 }37 }38 }()39 }40 wg.Wait()41 }()42 return nil43}这里原始实现需要修正的点在于,如果消费者返回的异常不被理解,则 handler 的定义中,需要移除返回值 error,避免误解。
超时控制
一般来说,对于批处理任务,我们总是希望它能完整地执行完,所以没有设置超时时间。但是对于实时性要求强的处理任务来说,超过一定时间未执行完的任务,可能不再重要,可以放弃,此时就体现出了超时时间的价值。
超时时间控制可以在消费者控制,也可以在生产者控制,以下实现在消费者控制的代码。生产者控制需使用方自行控制。
1// Serve 启用服务2func (d *Dispatcher[T]) Serve(ctx context.Context, parallelNum int, lifeTime time.Duration) error {3 if parallelNum == 0 {4 parallelNum = 15 }6
7 go func() {8 defer func() {9 close(d.end) // 说明消费者消费完毕,Close 无需阻塞10 d.Close() // 所有消费者退出时,关闭 channel11 }12 var wg sync.WaitGroup13 for i := 0; i < parallelNum; i++ {14 wg.Add(1)15
25 collapsed lines
16 // 设置超时时间17 ctxWithTimeout, cancel := context.WithTimeout(ctx, lifeTime)18 go func() {19 defer func() {20 wg.Done()21 cancel() // 关联的上下文都需要被取消22 }()23
24 for item := range d.ch {25 select {26 case <-ctxWithTimeout.Done(): // 出现超时信号就退出27 log.Println("ctx done")28 return29 default:30 }31 if err := d.handler(ctxWithTimeout, item); err != nil {32 return33 }34 }35 }()36 }37 wg.Wait()38 }()39 return nil40}以上,我们自己实现了一个简单的支持自定义消费者的任务分发器,隐藏了通信的方式,支持持续任务生产,支持超时时间控制。