Skip to content
On this page

并发模式

本章介绍 Go 中常见的并发编程模式,帮助你构建高效的并发应用。

工作池模式

基本工作池

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID   int
    Data string
}

type Worker struct {
    ID       int
    JobQueue chan Job
    Quit     chan bool
}

func NewWorker(id int, jobQueue chan Job) *Worker {
    return &Worker{
        ID:       id,
        JobQueue: jobQueue,
        Quit:     make(chan bool),
    }
}

func (w *Worker) Start() {
    go func() {
        for {
            select {
            case job := <-w.JobQueue:
                fmt.Printf("Worker %d 处理任务 %d\n", w.ID, job.ID)
                time.Sleep(100 * time.Millisecond)
            case <-w.Quit:
                fmt.Printf("Worker %d 停止\n", w.ID)
                return
            }
        }
    }()
}

func (w *Worker) Stop() {
    w.Quit <- true
}

type Dispatcher struct {
    WorkerPool chan chan Job
    JobQueue   chan Job
    Workers    []*Worker
    wg         sync.WaitGroup
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{
        WorkerPool: pool,
        JobQueue:   make(chan Job, 100),
        Workers:    make([]*Worker, maxWorkers),
    }
}

func (d *Dispatcher) Run() {
    for i := 0; i < len(d.Workers); i++ {
        worker := NewWorker(i+1, d.WorkerPool)
        d.Workers[i] = worker
        worker.Start()
    }
    
    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for job := range d.JobQueue {
        go func(job Job) {
            jobQueue := <-d.WorkerPool
            jobQueue <- job
        }(job)
    }
}

func (d *Dispatcher) AddJob(job Job) {
    d.JobQueue <- job
}

func main() {
    dispatcher := NewDispatcher(5)
    dispatcher.Run()
    
    for i := 1; i <= 20; i++ {
        dispatcher.AddJob(Job{ID: i, Data: fmt.Sprintf("任务 %d", i)})
    }
    
    time.Sleep(2 * time.Second)
}

Pipeline 模式

基本管道

go
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, num := range nums {
            out <- num
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for num := range in {
            out <- num * num
        }
    }()
    return out
}

func addOne(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for num := range in {
            out <- num + 1
        }
    }()
    return out
}

func main() {
    numbers := generate(1, 2, 3, 4, 5)
    squares := square(numbers)
    results := addOne(squares)
    
    for result := range results {
        fmt.Println(result)
    }
}

扇出-扇入

go
func producer(ch chan<- int) {
    defer close(ch)
    for i := 0; i < 10; i++ {
        ch <- i
    }
}

func worker(id int, in <-chan int, out chan<- int) {
    defer close(out)
    for num := range in {
        fmt.Printf("Worker %d 处理 %d\n", id, num)
        out <- num * num
    }
}

func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for num := range c {
                out <- num
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    jobs := make(chan int, 10)
    
    go producer(jobs)
    
    var workers []<-chan int
    for i := 1; i <= 3; i++ {
        results := make(chan int, 10)
        go worker(i, jobs, results)
        workers = append(workers, results)
    }
    
    out := fanIn(workers...)
    
    for result := range out {
        fmt.Println("结果:", result)
    }
}

发布-订阅模式

go
type Event struct {
    Type    string
    Payload interface{}
}

type Subscriber struct {
    ID   string
    Ch   chan Event
    Quit chan bool
}

type EventBus struct {
    subscribers map[string][]*Subscriber
    register    chan *Subscriber
    unregister  chan *Subscriber
    publish     chan Event
    mu          sync.RWMutex
}

func NewEventBus() *EventBus {
    return &EventBus{
        subscribers: make(map[string][]*Subscriber),
        register:    make(chan *Subscriber),
        unregister:  make(chan *Subscriber),
        publish:     make(chan Event, 100),
    }
}

func (eb *EventBus) Run() {
    for {
        select {
        case sub := <-eb.register:
            eb.mu.Lock()
            eb.subscribers[sub.ID] = append(eb.subscribers[sub.ID], sub)
            eb.mu.Unlock()
            
        case sub := <-eb.unregister:
            eb.mu.Lock()
            if subs, ok := eb.subscribers[sub.ID]; ok {
                for i, s := range subs {
                    if s == sub {
                        eb.subscribers[sub.ID] = append(subs[:i], subs[i+1:]...)
                        break
                    }
                }
            }
            eb.mu.Unlock()
            
        case event := <-eb.publish:
            eb.mu.RLock()
            for _, subs := range eb.subscribers {
                for _, sub := range subs {
                    select {
                    case sub.Ch <- event:
                    default:
                    }
                }
            }
            eb.mu.RUnlock()
        }
    }
}

func (eb *EventBus) Subscribe(id string) *Subscriber {
    sub := &Subscriber{
        ID:   id,
        Ch:   make(chan Event, 10),
        Quit: make(chan bool),
    }
    eb.register <- sub
    return sub
}

func (eb *EventBus) Unsubscribe(sub *Subscriber) {
    close(sub.Ch)
    eb.unregister <- sub
}

func (eb *EventBus) Publish(event Event) {
    eb.publish <- event
}

func main() {
    bus := NewEventBus()
    go bus.Run()
    
    sub1 := bus.Subscribe("sub1")
    sub2 := bus.Subscribe("sub2")
    
    go func() {
        for event := range sub1.Ch {
            fmt.Printf("Sub1 收到: %s\n", event.Type)
        }
    }()
    
    go func() {
        for event := range sub2.Ch {
            fmt.Printf("Sub2 收到: %s\n", event.Type)
        }
    }()
    
    bus.Publish(Event{Type: "test1"})
    bus.Publish(Event{Type: "test2"})
    
    time.Sleep(time.Second)
    bus.Unsubscribe(sub1)
    bus.Unsubscribe(sub2)
}

限流模式

令牌桶

go
type TokenBucket struct {
    capacity int
    tokens   int
    rate     time.Duration
    ticker   *time.Ticker
    mu       sync.Mutex
}

func NewTokenBucket(capacity int, rate time.Duration) *TokenBucket {
    tb := &TokenBucket{
        capacity: capacity,
        tokens:   capacity,
        rate:     rate,
        ticker:   time.NewTicker(rate),
    }
    
    go tb.refill()
    return tb
}

func (tb *TokenBucket) refill() {
    for range tb.ticker.C {
        tb.mu.Lock()
        if tb.tokens < tb.capacity {
            tb.tokens++
        }
        tb.mu.Unlock()
    }
}

func (tb *TokenBucket) Take() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    if tb.tokens > 0 {
        tb.tokens--
        return true
    }
    return false
}

func main() {
    bucket := NewTokenBucket(5, 100*time.Millisecond)
    
    for i := 0; i < 10; i++ {
        if bucket.Take() {
            fmt.Printf("请求 %d: 成功\n", i+1)
        } else {
            fmt.Printf("请求 %d: 限流\n", i+1)
        }
        time.Sleep(50 * time.Millisecond)
    }
}

漏桶

go
type LeakyBucket struct {
    capacity int
    queue    chan struct{}
    rate     time.Duration
    ticker   *time.Ticker
}

func NewLeakyBucket(capacity int, rate time.Duration) *LeakyBucket {
    lb := &LeakyBucket{
        capacity: capacity,
        queue:    make(chan struct{}, capacity),
        rate:     rate,
        ticker:   time.NewTicker(rate),
    }
    
    go lb.drain()
    return lb
}

func (lb *LeakyBucket) drain() {
    for range lb.ticker.C {
        select {
        case <-lb.queue:
        default:
        }
    }
}

func (lb *LeakyBucket) Allow() bool {
    select {
    case lb.queue <- struct{}{}:
        return true
    default:
        return false
    }
}

func main() {
    bucket := NewLeakyBucket(5, 200*time.Millisecond)
    
    for i := 0; i < 10; i++ {
        if bucket.Allow() {
            fmt.Printf("请求 %d: 允许\n", i+1)
        } else {
            fmt.Printf("请求 %d: 拒绝\n", i+1)
        }
    }
}

超时模式

带超时的操作

go
func withTimeout(fn func() (interface{}, error), timeout time.Duration) (interface{}, error) {
    result := make(chan interface{}, 1)
    errChan := make(chan error, 1)
    
    go func() {
        data, err := fn()
        if err != nil {
            errChan <- err
            return
        }
        result <- data
    }()
    
    select {
    case data := <-result:
        return data, nil
    case err := <-errChan:
        return nil, err
    case <-time.After(timeout):
        return nil, fmt.Errorf("操作超时")
    }
}

func main() {
    result, err := withTimeout(func() (interface{}, error) {
        time.Sleep(2 * time.Second)
        return "完成", nil
    }, 1*time.Second)
    
    if err != nil {
        fmt.Println("错误:", err)
    } else {
        fmt.Println("结果:", result)
    }
}

重试模式

指数退避重试

go
func Retry(fn func() error, maxAttempts int, initialDelay time.Duration) error {
    var err error
    delay := initialDelay
    
    for attempt := 0; attempt < maxAttempts; attempt++ {
        err = fn()
        if err == nil {
            return nil
        }
        
        if attempt < maxAttempts-1 {
            fmt.Printf("尝试 %d 失败,%v 后重试\n", attempt+1, delay)
            time.Sleep(delay)
            delay *= 2
        }
    }
    
    return fmt.Errorf("重试 %d 次后失败: %v", maxAttempts, err)
}

func main() {
    attempt := 0
    err := Retry(func() error {
        attempt++
        fmt.Printf("执行尝试 %d\n", attempt)
        if attempt < 3 {
            return fmt.Errorf("失败")
        }
        return nil
    }, 5, time.Second)
    
    if err != nil {
        fmt.Println("错误:", err)
    } else {
        fmt.Println("成功")
    }
}

最佳实践

  1. 避免共享状态:使用 Channel 通信
  2. 控制并发数:使用工作池
  3. 处理超时:使用 select 和 time.After
  4. 优雅关闭:使用 context 和 quit channel
  5. 处理 Panic:在 goroutine 中使用 recover

总结

  • 工作池:管理并发任务
  • Pipeline:数据处理流水线
  • 发布-订阅:事件驱动架构
  • 限流:控制请求速率
  • 超时:防止阻塞
  • 重试:处理临时故障

下一章:反射

基于 MIT 许可发布