Skip to content
On this page

Goroutines

Goroutine 是 Go 实现并发的核心机制。本章将深入探讨 Goroutine 的使用和最佳实践。

Goroutine 基础

什么是 Goroutine

Goroutine 是轻量级线程,由 Go 运行时管理。

特点:

  • 内存占用小(初始 2KB)
  • 创建和销毁成本低
  • 调度由运行时自动完成

创建 Goroutine

go
package main

import (
    "fmt"
    "time"
)

func hello() {
    fmt.Println("Hello from goroutine")
}

func main() {
    go hello()
    
    fmt.Println("Main function")
    time.Sleep(time.Second)
}

Goroutine 生命周期

go
func longRunningTask() {
    for i := 0; i < 5; i++ {
        fmt.Println("任务执行中...")
        time.Sleep(500 * time.Millisecond)
    }
}

func main() {
    go longRunningTask()
    
    time.Sleep(3 * time.Second)
    fmt.Println("Main 结束")
}

Goroutine 调度

M:N 调度模型

Go 使用 M:N 调度器:

  • M:操作系统线程
  • N:Goroutines
  • P:处理器(调度器上下文)

GOMAXPROCS

go
import "runtime"

func main() {
    // 设置使用的 CPU 核心数
    runtime.GOMAXPROCS(runtime.NumCPU())
    
    fmt.Println("CPU 核心数:", runtime.NumCPU())
}

Goroutine 通信

使用 Channel

go
func worker(id int, ch chan string) {
    ch <- fmt.Sprintf("Worker %d 完成", id)
}

func main() {
    ch := make(chan string)
    
    for i := 1; i <= 3; i++ {
        go worker(i, ch)
    }
    
    for i := 1; i <= 3; i++ {
        fmt.Println(<-ch)
    }
}

使用 WaitGroup

go
import "sync"

func task(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Task %d 开始\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Task %d 完成\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go task(i, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有任务完成")
}

Goroutine 泄漏

泄漏示例

go
// 错误示例:goroutine 泄漏
func leaky() {
    ch := make(chan int)
    
    go func() {
        <-ch  // 永远阻塞
    }()
    
    // ch 永远不会被关闭
}

防止泄漏

go
func nonLeaky() {
    ch := make(chan int)
    
    go func() {
        select {
        case <-ch:
        case <-time.After(time.Second):
            fmt.Println("超时")
        }
    }()
    
    close(ch)
}

Goroutine 池

简单的 Goroutine 池

go
type WorkerPool struct {
    tasks   chan func()
    workers int
    wg      sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    pool := &WorkerPool{
        tasks:   make(chan func(), 100),
        workers: workers,
    }
    
    pool.wg.Add(workers)
    for i := 0; i < workers; i++ {
        go pool.worker()
    }
    
    return pool
}

func (p *WorkerPool) worker() {
    defer p.wg.Done()
    for task := range p.tasks {
        task()
    }
}

func (p *WorkerPool) Submit(task func()) {
    p.tasks <- task
}

func (p *WorkerPool) Close() {
    close(p.tasks)
    p.wg.Wait()
}

func main() {
    pool := NewWorkerPool(5)
    
    for i := 0; i < 20; i++ {
        i := i
        pool.Submit(func() {
            fmt.Printf("处理任务 %d\n", i)
            time.Sleep(100 * time.Millisecond)
        })
    }
    
    pool.Close()
}

Goroutine 模式

Fan-out

go
func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(id int, ch <-chan int) {
    for value := range ch {
        fmt.Printf("Consumer %d: %d\n", id, value)
    }
}

func main() {
    ch := make(chan int)
    
    go producer(ch)
    
    for i := 1; i <= 3; i++ {
        go consumer(i, ch)
    }
    
    time.Sleep(time.Second)
}

Fan-in

go
func producer(id int, ch chan<- int) {
    for i := 0; i < 5; i++ {
        ch <- id*10 + i
    }
}

func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    
    for _, ch := range channels {
        go func(c <-chan int) {
            for value := range c {
                out <- value
            }
        }(ch)
    }
    
    return out
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    
    go producer(1, ch1)
    go producer(2, ch2)
    
    out := fanIn(ch1, ch2)
    
    for i := 0; i < 10; i++ {
        fmt.Println(<-out)
    }
}

Pipeline

go
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, num := range nums {
            out <- num
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for num := range in {
            out <- num * num
        }
        close(out)
    }()
    return out
}

func main() {
    numbers := generate(1, 2, 3, 4, 5)
    squares := square(numbers)
    
    for result := range squares {
        fmt.Println(result)
    }
}

Goroutine 调试

使用 runtime.Stack

go
func printGoroutines() {
    buf := make([]byte, 1<<16)
    n := runtime.Stack(buf, true)
    fmt.Printf("Goroutines:\n%s", buf[:n])
}

func main() {
    for i := 0; i < 5; i++ {
        go func() {
            time.Sleep(time.Second)
        }()
    }
    
    printGoroutines()
}

最佳实践

  1. 限制 Goroutine 数量:使用池模式
  2. 避免阻塞:使用 select 和超时
  3. 正确关闭 Channel:防止泄漏
  4. 使用 Context:管理生命周期
  5. 处理 Panic:使用 recover

Panic 处理

go
func safeGoroutine(f func()) {
    go func() {
        defer func() {
            if r := recover(); r != nil {
                fmt.Printf("恢复: %v\n", r)
            }
        }()
        f()
    }()
}

func main() {
    safeGoroutine(func() {
        panic("出错了")
    })
    
    time.Sleep(time.Second)
    fmt.Println("程序继续")
}

练习

练习 1:并发下载

实现并发下载多个 URL。

答案
go
package main

import (
    "fmt"
    "io"
    "net/http"
    "sync"
    "time"
)

func downloadURL(url string, wg *sync.WaitGroup) {
    defer wg.Done()
    
    start := time.Now()
    resp, err := http.Get(url)
    if err != nil {
        fmt.Printf("下载 %s 失败: %v\n", url, err)
        return
    }
    defer resp.Body.Close()
    
    _, err = io.Copy(io.Discard, resp.Body)
    if err != nil {
        fmt.Printf("读取 %s 失败: %v\n", url, err)
        return
    }
    
    fmt.Printf("下载 %s 完成,耗时: %v\n", url, time.Since(start))
}

func main() {
    urls := []string{
        "https://example.com",
        "https://google.com",
        "https://github.com",
    }
    
    var wg sync.WaitGroup
    
    for _, url := range urls {
        wg.Add(1)
        go downloadURL(url, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有下载完成")
}

总结

  • Goroutine 是轻量级线程
  • 使用 Channel 通信
  • 使用 WaitGroup 同步
  • 避免泄漏和阻塞
  • 使用池模式管理数量
  • 正确处理 Panic

下一章:Channels

基于 MIT 许可发布