Goroutines
Goroutine 是 Go 实现并发的核心机制。本章将深入探讨 Goroutine 的使用和最佳实践。
Goroutine 基础
什么是 Goroutine
Goroutine 是轻量级线程,由 Go 运行时管理。
特点:
- 内存占用小(初始 2KB)
- 创建和销毁成本低
- 调度由运行时自动完成
创建 Goroutine
go
package main
import (
"fmt"
"time"
)
func hello() {
fmt.Println("Hello from goroutine")
}
func main() {
go hello()
fmt.Println("Main function")
time.Sleep(time.Second)
}
Goroutine 生命周期
go
func longRunningTask() {
for i := 0; i < 5; i++ {
fmt.Println("任务执行中...")
time.Sleep(500 * time.Millisecond)
}
}
func main() {
go longRunningTask()
time.Sleep(3 * time.Second)
fmt.Println("Main 结束")
}
Goroutine 调度
M:N 调度模型
Go 使用 M:N 调度器:
- M:操作系统线程
- N:Goroutines
- P:处理器(调度器上下文)
GOMAXPROCS
go
import "runtime"
func main() {
// 设置使用的 CPU 核心数
runtime.GOMAXPROCS(runtime.NumCPU())
fmt.Println("CPU 核心数:", runtime.NumCPU())
}
Goroutine 通信
使用 Channel
go
func worker(id int, ch chan string) {
ch <- fmt.Sprintf("Worker %d 完成", id)
}
func main() {
ch := make(chan string)
for i := 1; i <= 3; i++ {
go worker(i, ch)
}
for i := 1; i <= 3; i++ {
fmt.Println(<-ch)
}
}
使用 WaitGroup
go
import "sync"
func task(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Task %d 开始\n", id)
time.Sleep(time.Second)
fmt.Printf("Task %d 完成\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go task(i, &wg)
}
wg.Wait()
fmt.Println("所有任务完成")
}
Goroutine 泄漏
泄漏示例
go
// 错误示例:goroutine 泄漏
func leaky() {
ch := make(chan int)
go func() {
<-ch // 永远阻塞
}()
// ch 永远不会被关闭
}
防止泄漏
go
func nonLeaky() {
ch := make(chan int)
go func() {
select {
case <-ch:
case <-time.After(time.Second):
fmt.Println("超时")
}
}()
close(ch)
}
Goroutine 池
简单的 Goroutine 池
go
type WorkerPool struct {
tasks chan func()
workers int
wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {
pool := &WorkerPool{
tasks: make(chan func(), 100),
workers: workers,
}
pool.wg.Add(workers)
for i := 0; i < workers; i++ {
go pool.worker()
}
return pool
}
func (p *WorkerPool) worker() {
defer p.wg.Done()
for task := range p.tasks {
task()
}
}
func (p *WorkerPool) Submit(task func()) {
p.tasks <- task
}
func (p *WorkerPool) Close() {
close(p.tasks)
p.wg.Wait()
}
func main() {
pool := NewWorkerPool(5)
for i := 0; i < 20; i++ {
i := i
pool.Submit(func() {
fmt.Printf("处理任务 %d\n", i)
time.Sleep(100 * time.Millisecond)
})
}
pool.Close()
}
Goroutine 模式
Fan-out
go
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func consumer(id int, ch <-chan int) {
for value := range ch {
fmt.Printf("Consumer %d: %d\n", id, value)
}
}
func main() {
ch := make(chan int)
go producer(ch)
for i := 1; i <= 3; i++ {
go consumer(i, ch)
}
time.Sleep(time.Second)
}
Fan-in
go
func producer(id int, ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- id*10 + i
}
}
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
for _, ch := range channels {
go func(c <-chan int) {
for value := range c {
out <- value
}
}(ch)
}
return out
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go producer(1, ch1)
go producer(2, ch2)
out := fanIn(ch1, ch2)
for i := 0; i < 10; i++ {
fmt.Println(<-out)
}
}
Pipeline
go
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, num := range nums {
out <- num
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
out <- num * num
}
close(out)
}()
return out
}
func main() {
numbers := generate(1, 2, 3, 4, 5)
squares := square(numbers)
for result := range squares {
fmt.Println(result)
}
}
Goroutine 调试
使用 runtime.Stack
go
func printGoroutines() {
buf := make([]byte, 1<<16)
n := runtime.Stack(buf, true)
fmt.Printf("Goroutines:\n%s", buf[:n])
}
func main() {
for i := 0; i < 5; i++ {
go func() {
time.Sleep(time.Second)
}()
}
printGoroutines()
}
最佳实践
- 限制 Goroutine 数量:使用池模式
- 避免阻塞:使用 select 和超时
- 正确关闭 Channel:防止泄漏
- 使用 Context:管理生命周期
- 处理 Panic:使用 recover
Panic 处理
go
func safeGoroutine(f func()) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("恢复: %v\n", r)
}
}()
f()
}()
}
func main() {
safeGoroutine(func() {
panic("出错了")
})
time.Sleep(time.Second)
fmt.Println("程序继续")
}
练习
练习 1:并发下载
实现并发下载多个 URL。
答案
go
package main
import (
"fmt"
"io"
"net/http"
"sync"
"time"
)
func downloadURL(url string, wg *sync.WaitGroup) {
defer wg.Done()
start := time.Now()
resp, err := http.Get(url)
if err != nil {
fmt.Printf("下载 %s 失败: %v\n", url, err)
return
}
defer resp.Body.Close()
_, err = io.Copy(io.Discard, resp.Body)
if err != nil {
fmt.Printf("读取 %s 失败: %v\n", url, err)
return
}
fmt.Printf("下载 %s 完成,耗时: %v\n", url, time.Since(start))
}
func main() {
urls := []string{
"https://example.com",
"https://google.com",
"https://github.com",
}
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go downloadURL(url, &wg)
}
wg.Wait()
fmt.Println("所有下载完成")
}
总结
- Goroutine 是轻量级线程
- 使用 Channel 通信
- 使用 WaitGroup 同步
- 避免泄漏和阻塞
- 使用池模式管理数量
- 正确处理 Panic
下一章:Channels