Channels
Channel 是 Go 中 Goroutine 之间通信的主要方式,遵循"不要通过共享内存来通信,而要通过通信来共享内存"的理念。
Channel 基础
创建 Channel
go
package main
import "fmt"
func main() {
// 无缓冲 channel
ch1 := make(chan int)
// 有缓冲 channel
ch2 := make(chan string, 10)
fmt.Printf("ch1: %v\n", ch1)
fmt.Printf("ch2: %v\n", ch2)
}
发送和接收
go
func main() {
ch := make(chan string)
// 启动 goroutine 发送数据
go func() {
ch <- "Hello"
}()
// 接收数据
msg := <-ch
fmt.Println(msg)
}
双向 Channel
go
func main() {
ch := make(chan int)
go func() {
ch <- 42
}()
value := <-ch
fmt.Println(value)
}
单向 Channel
只发送 Channel
go
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}
func main() {
ch := make(chan int)
go producer(ch)
for value := range ch {
fmt.Println(value)
}
}
只接收 Channel
go
func consumer(ch <-chan int) {
for value := range ch {
fmt.Println("消费:", value)
}
}
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}()
consumer(ch)
}
缓冲 Channel
基本用法
go
func main() {
ch := make(chan int, 3)
// 发送不会阻塞(缓冲未满)
ch <- 1
ch <- 2
ch <- 3
fmt.Println(len(ch)) // 3
fmt.Println(cap(ch)) // 3
// 接收
fmt.Println(<-ch) // 1
fmt.Println(len(ch)) // 2
}
缓冲 Channel 模式
go
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d 处理任务 %d\n", id, job)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 10)
results := make(chan int, 10)
// 启动 3 个 worker
for i := 1; i <= 3; i++ {
go worker(i, jobs, results)
}
// 发送任务
for i := 1; i <= 5; i++ {
jobs <- i
}
close(jobs)
// 收集结果
for i := 1; i <= 5; i++ {
<-results
}
}
关闭 Channel
检测 Channel 关闭
go
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)
for {
value, ok := <-ch
if !ok {
fmt.Println("Channel 已关闭")
break
}
fmt.Println(value)
}
}
使用 range
go
func main() {
ch := make(chan int, 3)
go func() {
for i := 0; i < 3; i++ {
ch <- i
}
close(ch)
}()
for value := range ch {
fmt.Println(value)
}
}
Channel 选择
Select 语句
go
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自 ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
}
非阻塞操作
go
func main() {
ch := make(chan int)
select {
case value := <-ch:
fmt.Println(value)
default:
fmt.Println("没有数据")
}
}
超时处理
go
func main() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "结果"
}()
select {
case result := <-ch:
fmt.Println(result)
case <-time.After(1 * time.Second):
fmt.Println("超时")
}
}
Channel 模式
生产者-消费者
go
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
fmt.Printf("生产: %d\n", i)
}
close(ch)
}
func consumer(ch <-chan int) {
for value := range ch {
fmt.Printf("消费: %d\n", value)
}
}
func main() {
ch := make(chan int, 5)
go producer(ch)
consumer(ch)
}
扇出模式
go
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d 处理 %d\n", id, job)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动多个 worker
for i := 1; i <= 3; i++ {
go worker(i, jobs, results)
}
// 发送任务
go func() {
for i := 0; i < 10; i++ {
jobs <- i
}
close(jobs)
}()
// 收集结果
for i := 0; i < 10; i++ {
<-results
}
}
扇入模式
go
func producer(id int, out chan<- int) {
for i := 0; i < 5; i++ {
out <- id*10 + i
}
close(out)
}
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
for _, ch := range channels {
go func(c <-chan int) {
for value := range c {
out <- value
}
}(ch)
}
return out
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
go producer(1, ch1)
go producer(2, ch2)
go producer(3, ch3)
out := fanIn(ch1, ch2, ch3)
for i := 0; i < 15; i++ {
fmt.Println(<-out)
}
}
Pipeline 模式
go
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, num := range nums {
out <- num
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
out <- num * num
}
close(out)
}()
return out
}
func main() {
numbers := generate(1, 2, 3, 4, 5)
squares := square(numbers)
for result := range squares {
fmt.Println(result)
}
}
Channel 最佳实践
1. 优先使用缓冲 Channel
go
// 推荐
ch := make(chan int, 10)
// 不推荐(容易死锁)
ch := make(chan int)
2. 总是关闭发送者
go
func producer(ch chan<- int) {
defer close(ch)
for i := 0; i < 10; i++ {
ch <- i
}
}
3. 使用 range 接收
go
// 推荐
for value := range ch {
// 处理 value
}
// 不推荐
for {
value, ok := <-ch
if !ok {
break
}
// 处理 value
}
4. 避免在接收端关闭
go
// 错误:接收端关闭
func consumer(ch <-chan int) {
// close(ch) // 错误!
}
// 正确:发送端关闭
func producer(ch chan<- int) {
defer close(ch)
}
Channel 性能
缓冲大小选择
go
// 小缓冲:低延迟
ch := make(chan int, 1)
// 中等缓冲:平衡
ch := make(chan int, 100)
// 大缓冲:高吞吐
ch := make(chan int, 1000)
避免阻塞
go
func safeSend(ch chan<- int, value int) {
select {
case ch <- value:
default:
// 缓冲满,丢弃或处理
}
}
练习
练习 1:超时控制
实现带超时的操作。
答案
go
package main
import "time"
func withTimeout(ch <-chan int, timeout time.Duration) (int, bool) {
select {
case value := <-ch:
return value, true
case <-time.After(timeout):
return 0, false
}
}
func main() {
ch := make(chan int)
go func() {
time.Sleep(2 * time.Second)
ch <- 42
}()
if value, ok := withTimeout(ch, 1*time.Second); ok {
fmt.Println("收到:", value)
} else {
fmt.Println("超时")
}
}
总结
- Channel 是 Goroutine 通信的主要方式
- 支持缓冲和非缓冲模式
- 使用 select 处理多个 channel
- 遵循发送者关闭原则
- 使用 range 简化接收逻辑
- 选择合适的缓冲大小
下一章:Select 语句