Go 并发编程常用模式模板

一份可直接套用的 Go 并发参考。每个模式给出「何时用 / 模板 / 关键陷阱」。

贯穿全文的两条判断准则:

  • 能不共享就用通讯(channel / 消息传递),必须共享就用最简单的同步包起来(锁 / 原子)。
  • 每启动一个 goroutine,先想清楚它在什么条件下、怎么退出——否则就是 goroutine 泄漏。

版本说明:示例按 Go 1.22+ 编写。errgroup / singleflight 来自 golang.org/x/sync

共享变量加锁

何时用:多个 goroutine 读写同一块状态,且操作不是单一原子动作(需要把「复合操作」整体保护)。

import "sync"

type Counter struct {
	mu sync.Mutex
	n  int
}

func (c *Counter) Inc() {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.n++
}

func (c *Counter) Value() int {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.n
}

能用原子就别用锁(单个计数器、标志位、指针整体替换):

import "sync/atomic"

var n atomic.Int64
n.Add(1)        // 原子自增
_ = n.Load()    // 原子读

读多写少用 RWMutex(多个读者可并发持读锁):

type Cache struct {
	mu sync.RWMutex
	m  map[string]string
}

func (c *Cache) Get(k string) (string, bool) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	v, ok := c.m[k]
	return v, ok
}

func (c *Cache) Set(k, v string) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.m[k] = v
}

「几乎只读、偶尔整体替换」用原子指针(COW,读者无锁)

var config atomic.Pointer[Config]

// 读(无锁,极快)
cfg := config.Load()

// 写(构造新副本,原子换上)
config.Store(buildNewConfig())

关键陷阱

  • 一把锁要么保护一组相关字段,要么粒度过大成瓶颈——按「一起被读写的数据」划分锁的边界。
  • 多把锁务必全局固定获取顺序,否则死锁。
  • defer Unlock() 是默认习惯,避免 return / panic 路径忘记解锁。

单例 / 惰性初始化(Once)

何时用:某资源全局只该初始化一次且线程安全——数据库连接、连接池、全局配置。

import "sync"

var (
	once sync.Once
	db   *DB
)

func GetDB() *DB {
	once.Do(func() {
		db = mustConnect() // 只会执行一次,并发调用会阻塞等它完成
	})
	return db
}

Go 1.21+ 更简洁的写法

// OnceValue:把「只算一次的值」封装好,调用 GetDB() 即得
var GetDB = sync.OnceValue(func() *DB {
	return mustConnect()
})

关键陷阱

  • 不要自己手写「双重检查锁」——内存序极易写错,Once 已正确处理。
  • once.Do 里 panic 会被视为「已执行」,后续调用不会重试,初始化逻辑要保证不 panic 或自行兜底。

通讯:用 channel 传递数据(生产者-消费者)

何时用:数据 / 所有权要在 goroutine 之间流转,而不是被多方共享读写。

// 无缓冲 = 同步交接(发送方阻塞到接收方就绪)
// 有缓冲 = 解耦速率(缓冲未满即可发送)
ch := make(chan Item, 100)

// 生产者:发送方负责 close
go func() {
	defer close(ch)
	for _, it := range items {
		ch <- it
	}
}()

// 消费者:range 会一直取到 channel「已关闭且取空」
for it := range ch {
	handle(it)
}

关键陷阱

  • 永远由发送方 close,且在所有发送完成之后。向已关闭的 channel 发送会 panic。
  • 忘记 close 会让消费者的 range 永远阻塞 → 泄漏。
  • 多个发送方时,需用 WaitGroup 等全部发送方结束后再由一处 close(见模式 6)。

发布订阅(Pub/Sub)

何时用:一条消息要广播给所有订阅者(每人一份副本)。注意与 fan-out 区别:fan-out 是「一条消息只给某一个 worker」。

import "sync"

type Broker[T any] struct {
	mu   sync.RWMutex
	subs map[chan T]struct{}
}

func NewBroker[T any]() *Broker[T] {
	return &Broker[T]{subs: make(map[chan T]struct{})}
}

func (b *Broker[T]) Subscribe() chan T {
	ch := make(chan T, 16)
	b.mu.Lock()
	b.subs[ch] = struct{}{}
	b.mu.Unlock()
	return ch
}

func (b *Broker[T]) Unsubscribe(ch chan T) {
	b.mu.Lock()
	delete(b.subs, ch)
	close(ch)
	b.mu.Unlock()
}

func (b *Broker[T]) Publish(msg T) {
	b.mu.RLock()
	defer b.mu.RUnlock()
	for ch := range b.subs {
		select {
		case ch <- msg:
		default: // 某订阅者缓冲已满则丢弃,避免拖垮整个 Broker
		}
	}
}

关键陷阱

  • 慢订阅者会拖累广播——用带缓冲 channel + select/default 丢弃,或为每个订阅者配独立投递 goroutine。
  • 订阅者退出时必须 Unsubscribe,否则 Publish 会一直往无人读的 channel 发。

select 多路等待 + Actor 状态串行化

select:超时 / 取消 / 多路复用的通用积木

select {
case v := <-ch:
	handle(v)
case <-ctx.Done(): // 取消信号(最常用)
	return ctx.Err()
case <-time.After(2 * time.Second): // 超时
	return errTimeout
}

非阻塞收发(加 default):

select {
case ch <- v:
	// 发出去了
default:
	// 发不出去(缓冲满/无接收者),不阻塞
}

Actor:用独占 goroutine 替代锁

何时用:让状态只属于一个 goroutine,别人想读写就发命令进去——全程无锁,因为没有共享。

type cmd struct {
	op    string
	key   string
	value int
	reply chan int // 一次性回信通道
}

func store(ctx context.Context, cmds <-chan cmd) {
	state := map[string]int{} // 私有状态,无需任何锁
	for {
		select {
		case <-ctx.Done():
			return
		case c := <-cmds:
			switch c.op {
			case "set":
				state[c.key] = c.value
			case "get":
				c.reply <- state[c.key]
			}
		}
	}
}

// 调用方
func get(cmds chan<- cmd, key string) int {
	reply := make(chan int, 1)
	cmds <- cmd{op: "get", key: key, reply: reply}
	return <-reply
}

关键陷阱

  • reply channel 建议带缓冲(容量 1),避免 actor 因调用方没及时收而阻塞。
  • actor goroutine 必须能被 ctx 取消退出,否则泄漏。

流水线 / Fan-out / Worker Pool

三者递进:pipeline 是骨架,fan-out 给慢阶段加并行,worker pool 是 fan-out 的「有界」版。

Pipeline(各阶段并发运转,数据依次穿过)

func gen(ctx context.Context, nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			select {
			case out <- n:
			case <-ctx.Done():
				return // 支持取消,防止下游不收时泄漏
			}
		}
	}()
	return out
}

func sq(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case out <- n * n:
			case <-ctx.Done():
				return
			}
		}
	}()
	return out
}

// 使用:for r := range sq(ctx, gen(ctx, 1, 2, 3, 4)) { ... }

Worker Pool(固定 N 个 worker 消费任务队列)

func workerPool[J any, R any](
	ctx context.Context,
	jobs <-chan J,
	n int,
	work func(J) R,
) <-chan R {
	results := make(chan R)
	var wg sync.WaitGroup
	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := range jobs {
				select {
				case results <- work(j):
				case <-ctx.Done():
					return
				}
			}
		}()
	}
	// 全部 worker 退出后才关 results(关键:必须等 wg)
	go func() {
		wg.Wait()
		close(results)
	}()
	return results
}

Fan-in(合并多个 channel 到一个)

func merge[T any](cs ...<-chan T) <-chan T {
	out := make(chan T)
	var wg sync.WaitGroup
	wg.Add(len(cs))
	for _, c := range cs {
		go func(c <-chan T) {
			defer wg.Done()
			for v := range c {
				out <- v
			}
		}(c)
	}
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

关键陷阱

  • 关闭 out 的 goroutine 必须 wg.Wait() 之后再 close,否则会向已关闭 channel 发送 panic。
  • fan-out 多 worker 共读一个输入 channel,Go 保证每个值只被一个 worker 取到,无需额外加锁
  • 每个阶段都加 select { case <-ctx.Done(): return },否则下游提前退出时上游会永久阻塞。

context:取消与超时传播

何时用:几乎所有跨 goroutine / 跨 API 边界的并发函数——给整棵任务树装总开关,防泄漏、控超时。

import (
	"context"
	"time"
)

// 超时
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
defer cancel() // 一定要调用,否则计时器泄漏

// 或手动取消
ctx, cancel := context.WithCancel(parent)
defer cancel()

// goroutine 内监听取消
func worker(ctx context.Context, in <-chan Task) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err() // 被取消或超时
		case t, ok := <-in:
			if !ok {
				return nil
			}
			handle(t)
		}
	}
}

关键陷阱

  • ctx 按约定永远是函数第一个参数func F(ctx context.Context, ...)
  • WithCancel/WithTimeout 返回的 cancel 必须调用(通常 defer cancel()),否则资源泄漏。
  • 不要把 ctx 存进结构体长期持有;它应随调用链传递。
  • 别用 context 传业务参数,只传请求作用域的取消 / 截止时间 / 少量元数据。

结构化并发(errgroup)

何时用:并发发起一批独立任务,等它们全部完成,且任一失败就取消其余

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([][]byte, error) {
	g, ctx := errgroup.WithContext(ctx)
	results := make([][]byte, len(urls))

	for i, url := range urls { // Go 1.22+ 每次迭代是独立变量,无需再 i, url := i, url
		g.Go(func() error {
			data, err := fetch(ctx, url)
			if err != nil {
				return err // 第一个非 nil error 会取消 ctx,联动其余任务
			}
			results[i] = data
			return nil
		})
	}

	if err := g.Wait(); err != nil { // 汇合点:等全部,返回首个错误
		return nil, err
	}
	return results, nil
}

限制并发度(errgroup 自带,等价于内建 worker pool):

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(8) // 最多 8 个任务同时跑
for _, job := range jobs {
	g.Go(func() error { return process(ctx, job) })
}
err := g.Wait()

关键陷阱

  • g.Go 里返回 error 才会触发联动取消;任务内部要监听 ctx.Done() 才能真正提前退出。
  • 多个任务写同一切片不同下标是安全的(无重叠);写同一 map / 累加同一变量仍需加锁或原子。
  • 别裸 go 出去不管——凡是「发起一批、要等齐」的场景都用 errgroup 给一个明确汇合点。

附录:两个高频工具(用到再查)

Singleflight——并发请求同一 key 时只真正执行一次,其余共享结果(防缓存击穿):

import "golang.org/x/sync/singleflight"

var g singleflight.Group

func getUser(id string) (*User, error) {
	v, err, _ := g.Do(id, func() (any, error) {
		return queryUserFromDB(id) // 同一 id 的并发调用只查一次库
	})
	if err != nil {
		return nil, err
	}
	return v.(*User), nil
}

限流(令牌桶)——控制每秒最多处理多少:

import "golang.org/x/time/rate"

limiter := rate.NewLimiter(rate.Limit(10), 1) // 10 次/秒,桶容量 1
for req := range requests {
	if err := limiter.Wait(ctx); err != nil { // 取不到令牌就等(可被 ctx 取消)
		return
	}
	go handle(req)
}

速查:怎么选

需求用什么
多方读写同一状态锁(Mutex);只读多用 RWMutex;单值用 atomic
只初始化一次sync.Once / sync.OnceValue
数据在 goroutine 间流转channel(生产者-消费者)
一条消息广播给所有人Pub/Sub Broker
一条任务只给一个 workerfan-out / worker pool
状态独占、想免锁Actor(独占 goroutine + 命令 channel)
等多个事件(取消/超时/多路)select
分阶段处理数据流pipeline
给慢阶段加可控并行worker pool / errgroup.SetLimit
取消、超时、防泄漏context
发起一批任务、等齐、失败联动errgroup
并发请求同一资源只算一次singleflight
控制处理速率rate.Limiter

一句话收口:前半部分(锁 / Once / channel / Pub-Sub)解决「状态怎么安全共享或传递」,后半部分(select / pipeline / pool / context / errgroup)解决「并发任务怎么组织、控制和收尾」。能不共享就用通讯,必须共享就用最简单的同步包起来,且每个 goroutine 都要有明确的退出路径。