runhao 的灵光一闪

简单批处理实现

2025-07-02
效率 Golang分发者可复用
14分钟
2795字

[TOC]

概述

平时经常需要写工具处理给定的一堆数据,最简单的实现是一个 for 循环串行执行,为了提高效率实现了一个通用的分发者,生产任务给多个消费者并行处理。 该模式下,有一个任务生产者,同时有多个任务消费者。 在本地,当前代码实现的是通过 Golang 的 channel 进行任务传递。 同样可以实现通过网络调用,将任务分发到不同机器上进行任务处理,真正实现负载的均衡。

直接上代码~

dispatcher 实现

1
package dispatcher
2
3
import (
4
"context"
5
"log"
6
"sync"
7
)
8
9
// Dispatcher 任务分发器
10
// 使用了 Golang 的泛型,需要提前定义好需要处理的任务类型,可以是基础类型,也可以是自定义结构体等
11
type Dispatcher[T any] struct {
12
handler func(ctx context.Context, ch <-chan T) error
13
}
14
15
// NewDispatcher 新建任务分发器
42 collapsed lines
16
// 在新建任务分发器时,需要传入对应的处理方法,通过 channel 上的 T 的类型将 Dispatcher 实例化
17
func NewDispatcher[T any](handler func(ctx context.Context, ch <-chan T) error) *Dispatcher[T] {
18
return &Dispatcher[T]{
19
handler: handler,
20
}
21
}
22
23
// Exec 执行
24
// items 为需要被处理的一批数据
25
// parallelNum 为并行启动的 handler 协程数目
26
func (d *Dispatcher[T]) Exec(ctx context.Context, items []T, parallelNum int) error {
27
if parallelNum == 0 {
28
parallelNum = 1
29
}
30
31
ch := make(chan T)
32
// 对于要处理的任务进行分发,在分发完毕后,会关闭 channel
33
go func(ctx context.Context) {
34
defer close(ch)
35
for idx, item := range items {
36
ch <- item
37
if idx%1000 == 0 {
38
log.Printf("task idx[%d] dispatched", idx)
39
}
40
}
41
}(ctx)
42
43
// 基于传入的并发数,创建对应数目协程执行处理函数
44
// 在 handler 中,需要持续从 channel 取出单一任务进行处理,当 channel 关闭时,会对应退出
45
var wg sync.WaitGroup
46
for i := 0; i < parallelNum; i++ {
47
wg.Add(1)
48
go func() {
49
defer wg.Done()
50
if err := d.handler(ctx, ch); err != nil {
51
return
52
}
53
}()
54
}
55
wg.Wait()
56
return nil
57
}

dispatcher 使用

1
func handler(ctx context.Context, ch <-chan string) error {
2
for s := range ch {
3
fmt.Printf("get string[%s]\n", s)
4
}
5
return nil
6
}
7
8
func main() {
9
ctx := context.TODO()
10
// 传入处理函数,也可以传入结构体的方法
11
distpatch := dispatcher.NewDispatcher(handler)
12
err := distpatch.Exec(ctx, []string{"1", "2", "3"}, 3)
13
if err != nil {
14
log.Fatalf("exec err[%+v]", err)
15
}
2 collapsed lines
16
log.Println("exec succeed")
17
}

进一步优化

上述的实现还有很多可以优化的地方,比如:

  1. 直接暴露了通过 channel 进行任务传递,未来想变更传递方式不方便
  2. 需要在 Exec 调用时就传入整批数据,不能边生产边消费
  3. 返回 error,但实际没有进行错误处理,Exec 始终返回 nil
  4. 没有超时控制,如果任务执行时间过长,将会持续阻塞

不暴露 channel

上述实现中,handler 对外暴露了入参是 channel,这也导致了使用者需要在 handler 中 for range 来获取数据。 暴露了底层细节,同时导致基本所有的使用者均需要生产这一行代码。

虽然这样的一点好处是,使用者可以知道在什么时候停止,以及一次处理多少个。 然而,这并不构成强有力的理由。 控制停止,主要有两个因素:

  1. 因失败停止 - 通过返回 error 可以实现
  2. 因超时停止 - 在后续小节将会介绍

而一次处理多个,实际上可以将 T 简单地变为数组实现,在生产时确定好需要在一次 handle 中处理的对象即可。 因此在 handler 的入参中将 channel 去掉,是顺理成章的。

1
// Dispatcher 任务分发器
2
type Dispatcher[T any] struct {
3
handler func(ctx context.Context, item T) error // 让处理函数专注于当前的元素
4
}
5
6
// NewDispatcher 新建任务分发器
7
func NewDispatcher[T any](handler func(ctx context.Context, item T) error) *Dispatcher[T] {
8
return &Dispatcher[T]{
9
handler: handler,
10
}
11
}
12
13
// Exec 执行
14
func (d *Dispatcher[T]) Exec(ctx context.Context, items []T, parallelNum int) error {
15
// ....相关实现同上
16 collapsed lines
16
17
var wg sync.WaitGroup
18
for i := 0; i < parallelNum; i++ {
19
wg.Add(1)
20
go func() {
21
defer wg.Done()
22
for item := range ch { // 让 dispatcher 内部闭环写入和读取
23
if err := d.handler(ctx, item); err != nil {
24
return
25
}
26
}
27
}()
28
}
29
wg.Wait()
30
return nil
31
}

边生产边消费

当前 Exec 的实现中,将生产部分放入了协程中,而消费部分通过 WaitGroup 阻塞住,直到生产完所有的任务调用 close(ch) 使得消费者协程能够退出死循环。 因为 WaitGroup 的阻塞状态,只有在生产完成前才能维持住,所以导致需要一次传入所有的任务。 当然,实际上可以每次新任务到来时都调用 Exec,但是每次进行相关资源的创建和销毁,有一定成本。

为了实现边生产边消费,核心就是要使得在生产过程中,始终不会关闭 channel,以维持消费者协程的驻留。 因此,可以如下实现:

1
type Dispatcher[T any] struct {
2
ch chan T // 支持写入和读取
3
end chan struct{} // 用于同步生产者和消费者的退出,避免生产结束,还没消费完就结束
4
handler func(ctx context.Context, item T) error // 让处理函数专注于当前的元素
5
}
6
7
// NewDispatcher 新建任务分发器
8
func NewDispatcher[T any](handler func(ctx context.Context, item T) error) *Dispatcher[T] {
9
return &Dispatcher[T]{
10
ch: make(chan T),
11
end: make(chan struct{}),
12
handler: handler,
13
}
14
}
15
40 collapsed lines
16
// Serve 启用服务
17
func (d *Dispatcher[T]) Serve(ctx context.Context, parallelNum int) error {
18
if parallelNum == 0 {
19
parallelNum = 1
20
}
21
22
go func() {
23
defer close(d.end) // 说明消费者消费完毕,Close 无需阻塞
24
var wg sync.WaitGroup
25
for i := 0; i < parallelNum; i++ {
26
wg.Add(1)
27
go func() {
28
defer wg.Done()
29
for item := range d.ch {
30
if err := d.handler(ctx, item); err != nil {
31
return
32
}
33
}
34
}()
35
}
36
wg.Wait()
37
}()
38
return nil
39
}
40
41
// Handle 处理
42
func (d *Dispatcher[T]) Handle(item T) {
43
if d.ch == nil {
44
return
45
}
46
d.ch <- item
47
}
48
49
func (d *Dispatcher[T]) Close() {
50
if d.ch == nil {
51
return
52
}
53
close(d.ch)
54
<-d.end // 阻塞住,等待消费者消费完已有任务
55
}

使用时,先调用 Serve,让消费者等待任务到来。再调用 defer Close(),保证资源在结束时会被回收。 最后,在每次新任务到来时调用 Handle 传递任务。

1
func main() {
2
ctx := context.TODO()
3
// 传入处理函数,也可以传入结构体的方法
4
distpatch := dispatcher.NewDispatcher(handler)
5
err := distpatch.Serve(ctx, 3)
6
if err != nil {
7
log.Fatalf("serve err[%+v]", err)
8
}
9
defer distpatch.Close()
10
// 此处可以通过读文件等方式,持续读取需要处理的任务,并逐一执行
11
for _, item := range []string{"1","2","3"} {
12
distpatch.Handle(item)
13
}
14
log.Println("exec succeed")
15
}

【注意】上述的实现仅仅考虑了如何可以持续生产和消费,但实际使用上 Handle 感知不到每次执行的成功与失败。 如果持续失败,实际上 Server 部分的消费者都会退出,Handle 调用最终将阻塞。

因此,在所有消费者都退出时,也需要关闭 channel,此时为了避免多次关闭,我们依赖 sync.Once 实现只关闭一次。 同时,由于消费者现在也会执行关闭 channel 操作,生产者写入 channel 时可能 panic,这里通过 recover 实现对异常的恢复。

1
type Dispatcher[T any] struct {
2
ch chan T
3
handler func(ctx context.Context, item T) error
4
once sync.Once // 让某操作只执行一次
5
}
6
7
// NewDispatcher 新建任务分发器
8
func NewDispatcher[T any](handler func(ctx context.Context, item T) error) *Dispatcher[T] {
9
return &Dispatcher[T]{
10
ch: make(chan T),
11
handler: handler,
12
once: sync.Once{},
13
}
14
}
15
49 collapsed lines
16
// Serve 启用服务
17
func (d *Dispatcher[T]) Serve(ctx context.Context, parallelNum int) error {
18
if parallelNum == 0 {
19
parallelNum = 1
20
}
21
22
go func() {
23
defer func() {
24
close(d.end) // 说明消费者消费完毕,Close 无需阻塞
25
d.Close() // 所有消费者退出时,关闭 channel
26
}
27
var wg sync.WaitGroup
28
for i := 0; i < parallelNum; i++ {
29
wg.Add(1)
30
go func() {
31
defer wg.Done()
32
for item := range d.ch {
33
if err := d.handler(ctx, item); err != nil {
34
return
35
}
36
}
37
}()
38
}
39
wg.Wait()
40
}()
41
return nil
42
}
43
44
// Handle 处理
45
func (d *Dispatcher[T]) Handle(item T) {
46
defer func() {
47
recover() // 捕获向已关闭 channel 写入的 panic
48
}()
49
if d.ch == nil {
50
return
51
}
52
d.ch <- item
53
}
54
55
// Close 进行资源关闭
56
func (d *Dispatcher[T]) Close() {
57
if d.ch == nil {
58
return
59
}
60
d.once.Do(func() { // 统一只执行一次关闭
61
close(d.ch)
62
})
63
<-d.end
64
}

感知每次操作异常

仅从感知的角度来说,有两种可能的要求:一种是生产者感知异常,另一种是消费者感知异常。 上述代码中,因为通过 channel 把生产和消费隔离开,导致消费者只能单向接收任务,而缺少返回任务运行结果的手段。 因此,如果生产者想感知异常,需要补全通信的链路。同样可以使用 channel,构造出任务ID + 异常处理结果的结构体,对这个 channel 消费以处理异常。

更常见的诉求,实际上是发生异常时需要进行异常处理,即消费者在执行主链路的任务失败后,感知异常,并进行相关补偿操作。 最常见的是重试,并且如果多次重试失败,对问题进行记录。 这种情况,主要考虑是在定义 handler 时在内部处理。 另一种方法是,允许另外传入异常处理 handler,在 dispatcher 内自动进行调度。

1
type Dispatcher[T any] struct {
2
ch chan T // 支持写入和读取
3
handler func(ctx context.Context, item T) error // 让处理函数专注于当前的元素
4
errHandler func(ctx context.Context, item T, err error)
5
}
6
7
// NewDispatcher 新建任务分发器
8
func NewDispatcher[T any](handler func(ctx context.Context, item T) error,
9
errHandler func(ctx context.Context, item T, err error)) *Dispatcher[T] {
10
return &Dispatcher[T]{
11
ch: make(chan T),
12
handler: handler,
13
errHandler: errHandler,
14
}
15
}
28 collapsed lines
16
17
// Serve 启用服务
18
func (d *Dispatcher[T]) Serve(ctx context.Context, parallelNum int) error {
19
if parallelNum == 0 {
20
parallelNum = 1
21
}
22
23
go func() {
24
defer func() {
25
close(d.end) // 说明消费者消费完毕,Close 无需阻塞
26
d.Close() // 所有消费者退出时,关闭 channel
27
}
28
var wg sync.WaitGroup
29
for i := 0; i < parallelNum; i++ {
30
wg.Add(1)
31
go func() {
32
defer wg.Done()
33
for item := range d.ch {
34
if err := d.handler(ctx, item); err != nil {
35
d.errHandler(ctx, item, err)
36
}
37
}
38
}()
39
}
40
wg.Wait()
41
}()
42
return nil
43
}

这里原始实现需要修正的点在于,如果消费者返回的异常不被理解,则 handler 的定义中,需要移除返回值 error,避免误解。

超时控制

一般来说,对于批处理任务,我们总是希望它能完整地执行完,所以没有设置超时时间。但是对于实时性要求强的处理任务来说,超过一定时间未执行完的任务,可能不再重要,可以放弃,此时就体现出了超时时间的价值。

超时时间控制可以在消费者控制,也可以在生产者控制,以下实现在消费者控制的代码。生产者控制需使用方自行控制。

1
// Serve 启用服务
2
func (d *Dispatcher[T]) Serve(ctx context.Context, parallelNum int, lifeTime time.Duration) error {
3
if parallelNum == 0 {
4
parallelNum = 1
5
}
6
7
go func() {
8
defer func() {
9
close(d.end) // 说明消费者消费完毕,Close 无需阻塞
10
d.Close() // 所有消费者退出时,关闭 channel
11
}
12
var wg sync.WaitGroup
13
for i := 0; i < parallelNum; i++ {
14
wg.Add(1)
15
25 collapsed lines
16
// 设置超时时间
17
ctxWithTimeout, cancel := context.WithTimeout(ctx, lifeTime)
18
go func() {
19
defer func() {
20
wg.Done()
21
cancel() // 关联的上下文都需要被取消
22
}()
23
24
for item := range d.ch {
25
select {
26
case <-ctxWithTimeout.Done(): // 出现超时信号就退出
27
log.Println("ctx done")
28
return
29
default:
30
}
31
if err := d.handler(ctxWithTimeout, item); err != nil {
32
return
33
}
34
}
35
}()
36
}
37
wg.Wait()
38
}()
39
return nil
40
}

以上,我们自己实现了一个简单的支持自定义消费者的任务分发器,隐藏了通信的方式,支持持续任务生产,支持超时时间控制。

本文标题:简单批处理实现
文章作者:Runhao
发布时间:2025-07-02
Copyright 2025
站点地图