并发模式
本章介绍 Go 中常见的并发编程模式,帮助你构建高效的并发应用。
工作池模式
基本工作池
go
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Worker struct {
ID int
JobQueue chan Job
Quit chan bool
}
func NewWorker(id int, jobQueue chan Job) *Worker {
return &Worker{
ID: id,
JobQueue: jobQueue,
Quit: make(chan bool),
}
}
func (w *Worker) Start() {
go func() {
for {
select {
case job := <-w.JobQueue:
fmt.Printf("Worker %d 处理任务 %d\n", w.ID, job.ID)
time.Sleep(100 * time.Millisecond)
case <-w.Quit:
fmt.Printf("Worker %d 停止\n", w.ID)
return
}
}
}()
}
func (w *Worker) Stop() {
w.Quit <- true
}
type Dispatcher struct {
WorkerPool chan chan Job
JobQueue chan Job
Workers []*Worker
wg sync.WaitGroup
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{
WorkerPool: pool,
JobQueue: make(chan Job, 100),
Workers: make([]*Worker, maxWorkers),
}
}
func (d *Dispatcher) Run() {
for i := 0; i < len(d.Workers); i++ {
worker := NewWorker(i+1, d.WorkerPool)
d.Workers[i] = worker
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for job := range d.JobQueue {
go func(job Job) {
jobQueue := <-d.WorkerPool
jobQueue <- job
}(job)
}
}
func (d *Dispatcher) AddJob(job Job) {
d.JobQueue <- job
}
func main() {
dispatcher := NewDispatcher(5)
dispatcher.Run()
for i := 1; i <= 20; i++ {
dispatcher.AddJob(Job{ID: i, Data: fmt.Sprintf("任务 %d", i)})
}
time.Sleep(2 * time.Second)
}
Pipeline 模式
基本管道
go
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, num := range nums {
out <- num
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for num := range in {
out <- num * num
}
}()
return out
}
func addOne(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for num := range in {
out <- num + 1
}
}()
return out
}
func main() {
numbers := generate(1, 2, 3, 4, 5)
squares := square(numbers)
results := addOne(squares)
for result := range results {
fmt.Println(result)
}
}
扇出-扇入
go
func producer(ch chan<- int) {
defer close(ch)
for i := 0; i < 10; i++ {
ch <- i
}
}
func worker(id int, in <-chan int, out chan<- int) {
defer close(out)
for num := range in {
fmt.Printf("Worker %d 处理 %d\n", id, num)
out <- num * num
}
}
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for num := range c {
out <- num
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
jobs := make(chan int, 10)
go producer(jobs)
var workers []<-chan int
for i := 1; i <= 3; i++ {
results := make(chan int, 10)
go worker(i, jobs, results)
workers = append(workers, results)
}
out := fanIn(workers...)
for result := range out {
fmt.Println("结果:", result)
}
}
发布-订阅模式
go
type Event struct {
Type string
Payload interface{}
}
type Subscriber struct {
ID string
Ch chan Event
Quit chan bool
}
type EventBus struct {
subscribers map[string][]*Subscriber
register chan *Subscriber
unregister chan *Subscriber
publish chan Event
mu sync.RWMutex
}
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[string][]*Subscriber),
register: make(chan *Subscriber),
unregister: make(chan *Subscriber),
publish: make(chan Event, 100),
}
}
func (eb *EventBus) Run() {
for {
select {
case sub := <-eb.register:
eb.mu.Lock()
eb.subscribers[sub.ID] = append(eb.subscribers[sub.ID], sub)
eb.mu.Unlock()
case sub := <-eb.unregister:
eb.mu.Lock()
if subs, ok := eb.subscribers[sub.ID]; ok {
for i, s := range subs {
if s == sub {
eb.subscribers[sub.ID] = append(subs[:i], subs[i+1:]...)
break
}
}
}
eb.mu.Unlock()
case event := <-eb.publish:
eb.mu.RLock()
for _, subs := range eb.subscribers {
for _, sub := range subs {
select {
case sub.Ch <- event:
default:
}
}
}
eb.mu.RUnlock()
}
}
}
func (eb *EventBus) Subscribe(id string) *Subscriber {
sub := &Subscriber{
ID: id,
Ch: make(chan Event, 10),
Quit: make(chan bool),
}
eb.register <- sub
return sub
}
func (eb *EventBus) Unsubscribe(sub *Subscriber) {
close(sub.Ch)
eb.unregister <- sub
}
func (eb *EventBus) Publish(event Event) {
eb.publish <- event
}
func main() {
bus := NewEventBus()
go bus.Run()
sub1 := bus.Subscribe("sub1")
sub2 := bus.Subscribe("sub2")
go func() {
for event := range sub1.Ch {
fmt.Printf("Sub1 收到: %s\n", event.Type)
}
}()
go func() {
for event := range sub2.Ch {
fmt.Printf("Sub2 收到: %s\n", event.Type)
}
}()
bus.Publish(Event{Type: "test1"})
bus.Publish(Event{Type: "test2"})
time.Sleep(time.Second)
bus.Unsubscribe(sub1)
bus.Unsubscribe(sub2)
}
限流模式
令牌桶
go
type TokenBucket struct {
capacity int
tokens int
rate time.Duration
ticker *time.Ticker
mu sync.Mutex
}
func NewTokenBucket(capacity int, rate time.Duration) *TokenBucket {
tb := &TokenBucket{
capacity: capacity,
tokens: capacity,
rate: rate,
ticker: time.NewTicker(rate),
}
go tb.refill()
return tb
}
func (tb *TokenBucket) refill() {
for range tb.ticker.C {
tb.mu.Lock()
if tb.tokens < tb.capacity {
tb.tokens++
}
tb.mu.Unlock()
}
}
func (tb *TokenBucket) Take() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}
func main() {
bucket := NewTokenBucket(5, 100*time.Millisecond)
for i := 0; i < 10; i++ {
if bucket.Take() {
fmt.Printf("请求 %d: 成功\n", i+1)
} else {
fmt.Printf("请求 %d: 限流\n", i+1)
}
time.Sleep(50 * time.Millisecond)
}
}
漏桶
go
type LeakyBucket struct {
capacity int
queue chan struct{}
rate time.Duration
ticker *time.Ticker
}
func NewLeakyBucket(capacity int, rate time.Duration) *LeakyBucket {
lb := &LeakyBucket{
capacity: capacity,
queue: make(chan struct{}, capacity),
rate: rate,
ticker: time.NewTicker(rate),
}
go lb.drain()
return lb
}
func (lb *LeakyBucket) drain() {
for range lb.ticker.C {
select {
case <-lb.queue:
default:
}
}
}
func (lb *LeakyBucket) Allow() bool {
select {
case lb.queue <- struct{}{}:
return true
default:
return false
}
}
func main() {
bucket := NewLeakyBucket(5, 200*time.Millisecond)
for i := 0; i < 10; i++ {
if bucket.Allow() {
fmt.Printf("请求 %d: 允许\n", i+1)
} else {
fmt.Printf("请求 %d: 拒绝\n", i+1)
}
}
}
超时模式
带超时的操作
go
func withTimeout(fn func() (interface{}, error), timeout time.Duration) (interface{}, error) {
result := make(chan interface{}, 1)
errChan := make(chan error, 1)
go func() {
data, err := fn()
if err != nil {
errChan <- err
return
}
result <- data
}()
select {
case data := <-result:
return data, nil
case err := <-errChan:
return nil, err
case <-time.After(timeout):
return nil, fmt.Errorf("操作超时")
}
}
func main() {
result, err := withTimeout(func() (interface{}, error) {
time.Sleep(2 * time.Second)
return "完成", nil
}, 1*time.Second)
if err != nil {
fmt.Println("错误:", err)
} else {
fmt.Println("结果:", result)
}
}
重试模式
指数退避重试
go
func Retry(fn func() error, maxAttempts int, initialDelay time.Duration) error {
var err error
delay := initialDelay
for attempt := 0; attempt < maxAttempts; attempt++ {
err = fn()
if err == nil {
return nil
}
if attempt < maxAttempts-1 {
fmt.Printf("尝试 %d 失败,%v 后重试\n", attempt+1, delay)
time.Sleep(delay)
delay *= 2
}
}
return fmt.Errorf("重试 %d 次后失败: %v", maxAttempts, err)
}
func main() {
attempt := 0
err := Retry(func() error {
attempt++
fmt.Printf("执行尝试 %d\n", attempt)
if attempt < 3 {
return fmt.Errorf("失败")
}
return nil
}, 5, time.Second)
if err != nil {
fmt.Println("错误:", err)
} else {
fmt.Println("成功")
}
}
最佳实践
- 避免共享状态:使用 Channel 通信
- 控制并发数:使用工作池
- 处理超时:使用 select 和 time.After
- 优雅关闭:使用 context 和 quit channel
- 处理 Panic:在 goroutine 中使用 recover
总结
- 工作池:管理并发任务
- Pipeline:数据处理流水线
- 发布-订阅:事件驱动架构
- 限流:控制请求速率
- 超时:防止阻塞
- 重试:处理临时故障
下一章:反射