Go 语言在设计之初就将并发作为其核心特性之一。它推崇 CSP (Communicating Sequential Processes, 通信顺序进程) 模型,其哲学是:
不要通过共享内存来通信,而要通过通信来共享内存。 (Do not communicate by sharing memory; instead, share memory by communicating.)
这使得 Go 的并发代码通常比传统基于锁和共享内存的模型更简单、更安全、也更易于推理。
协程(goroutine)和管道(channel)
协程(goroutine)
Goroutine 是 Go 并发设计的核心。它是由 Go 运行时(runtime)管理的、非常轻量级的执行线程。
核心特性:
- 轻量级:创建一个 goroutine 的成本非常低。它们的初始栈空间只有几 KB(通常是 2KB),并且可以根据需要动态伸缩,因此可以轻松创建成千上万个 goroutine。
- 由 Go 运行时管理:Goroutine 的调度由 Go 的调度器在内核线程之上完成,而不是直接由操作系统管理。这使得上下文切换的成本远低于操作系统的线程。
- 启动简单:使用
go
关键字后跟一个函数调用,即可启动一个新的 goroutine。
代码示例:
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 3; i++ {
fmt.Println(s)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
// 启动一个新的 goroutine 来执行 say("World")
go say("World")
// main goroutine 继续执行 say("Hello")
say("Hello")
}
输出(可能是交错的):
Hello
World
Hello
World
Hello
World
注意:main
函数本身也在一个 goroutine 中运行。当 main
函数返回时,程序会立即退出,不会等待其他 goroutine 完成。为了等待,我们需要使用其他同步机制,如 channel
或 sync.WaitGroup
。
Goroutine 中的 Panic 处理
一个 goroutine 中发生的 panic
,如果没有被 recover
,将会导致整个程序的崩溃。因此,在关键的 goroutine 中建立一个“防崩溃”机制非常重要。
最佳实践:在 goroutine 的入口处使用 defer
和 recover
。
func safeGo(f func()) {
go func() {
defer func() {
if err := recover(); err != nil {
log.Printf("goroutine panicked: %v", err)
}
}()
f()
}()
}
func myRiskyTask() {
// ... 可能发生 panic 的代码 ...
panic("something went wrong")
}
func main() {
safeGo(myRiskyTask) // 使用我们封装的函数来安全启动
// ... 程序会继续执行,不会因为 myRiskyTask 的 panic 而崩溃 ...
time.Sleep(time.Second)
fmt.Println("Main finished gracefully.")
}
管道(channel)
Channel 是 Go 中用于 goroutine 之间通信的管道。它可以被看作是一个类型化的、线程安全的消息队列,遵循**先进先出(FIFO)**的原则。
核心特性:
- 类型安全:每个 channel 都有一个特定的类型,只能传递该类型的数据。
- 通信与同步:向 channel 发送或从 channel 接收数据本身就是一种同步行为。默认情况下,发送和接收都会阻塞,直到另一方准备好。
Channel 的创建与使用:
// 创建一个可以传递 int 类型的无缓冲 channel
ch := make(chan int)
// 创建一个容量为 10 的、可以传递 string 类型的有缓冲 channel
bufCh := make(chan string, 10)
操作:
- 发送数据:
ch <- value
- 接收数据:
value := <-ch
- 关闭管道:
close(ch)
无缓冲 Channel
无缓冲 channel 的容量为零。发送操作会一直阻塞,直到另一个 goroutine 对该 channel 进行接收操作。同样,接收操作也会阻塞,直到另一个 goroutine 进行发送。这是一种强同步机制。
package main
import "fmt"
func main() {
ch := make(chan string)
go func() {
fmt.Println("Goroutine is ready to send data...")
ch <- "Hello from goroutine!" // 发送操作,会阻塞在这里
fmt.Println("Goroutine has sent data.")
}()
fmt.Println("Main is waiting to receive data...")
message := <-ch // 接收操作,会阻塞在这里直到接收到数据
fmt.Println("Main received:", message)
}
有缓冲 Channel
有缓冲 channel 拥有一个固定容量的队列。发送操作只有在缓冲区满时才会阻塞。接收操作只有在缓冲区空时才会阻塞。这提供了一种异步通信的能力。
package main
import "fmt"
func main() {
ch := make(chan int, 2) // 容量为 2 的缓冲 channel
ch <- 1 // 不会阻塞
ch <- 2 // 不会阻塞
// ch <- 3 // 再次发送会阻塞,因为缓冲区已满
fmt.Println(<-ch) // 输出 1
fmt.Println(<-ch) // 输出 2
}
遍历和关闭通道
关闭 Channel 与 for...range
close(ch)
:关闭一个 channel。关闭后不能再发送数据(会导致 panic),但仍可以接收已在缓冲区中的数据。v, ok := <-ch
:可以通过第二个返回值ok
判断 channel 是否已关闭且无数据可读。如果ok
为false
,表示 channel 已关闭。for v := range ch
:可以方便地持续从 channel接收数据,直到 channel 被关闭。
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs { // 会自动处理 channel 关闭的情况
fmt.Printf("Worker %d started job %d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("Worker %d finished job %d\n", id, j)
results <- j * 2
}
}
sync
包:更精细的并发控制
当“通过通信共享内存”的模式不适用,或需要处理更传统的并发问题(如保护共享资源)时,sync
包提供了必要的工具。
等待组 WaitGroup
用途:等待一组 goroutine 全部执行完成。 核心方法:
Add(delta int)
:计数器加delta
。通常在启动 goroutine 前调用。Done()
:计数器减一。通常在 goroutine 的末尾使用defer
调用。Wait()
:阻塞,直到计数器归零。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 确保在函数退出时通知 WaitGroup
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1) // 启动一个 goroutine 前,增加计数器
go worker(i, &wg)
}
wg.Wait() // 等待所有 worker 完成
fmt.Println("All workers completed.")
}
互斥锁 Mutex
用途:保护临界区,保证同一时间只有一个 goroutine 可以访问共享资源。 核心方法:
Lock()
:获取锁,如果锁已被占用,则阻塞。Unlock()
:释放锁。
最佳实践:使用 defer mu.Unlock()
来确保锁一定会被释放。
var counter int
var mu sync.Mutex
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}
读写锁 RWMutex
用途:在读多写少的场景下提升性能。它允许多个“读者”同时访问资源,但“写者”必须独占访问。
核心方法:
RLock()
/RUnlock()
:用于读操作的加锁和解锁。Lock()
/Unlock()
:用于写操作的加锁和解锁。
var config map[string]string
var mu sync.RWMutex
func readConfig(key string) string {
mu.RLock()
defer mu.RUnlock()
return config[key]
}
func writeConfig(key, value string) {
mu.Lock()
defer mu.Unlock()
config[key] = value
}
单次执行 Once
用途:确保某个函数在程序生命周期内只被执行一次,无论它被多少个 goroutine 调用。常用于单例模式的初始化。
核心方法: Do(f func())
:传入需要执行一次的函数。
var once sync.Once
var instance *MySingleton
func GetInstance() *MySingleton {
once.Do(func() {
fmt.Println("Initializing singleton...")
instance = &MySingleton{}
})
return instance
}
并发安全 Map
用途:提供一个原生支持并发读写的 map。
注意:sync.Map
并非旨在完全替代 map + Mutex
。它在特定场景下性能更优:
- 当一个 key 只被写入一次,但被读取很多次。
- 当多个 goroutine 读写不相交的键集合时。
核心方法:
Store(key, value interface{})
:存储键值对。Load(key interface{}) (value interface{}, ok bool)
:加载一个值。Delete(key interface{})
:删除一个键。Range(f func(key, value interface{}) bool)
:遍历 map。
var m sync.Map
// 存储
m.Store("name", "Alice")
// 读取
if name, ok := m.Load("name"); ok {
fmt.Println(name.(string))
}
临时对象池 Pool
用途:缓存和复用临时对象,以减轻垃圾回收(GC)的压力。它不是一个持久化的连接池。
注意:Pool 中的对象可能在任何时候被 GC 无通知地回收。
核心方法:
Get() interface{}
:从池中获取一个对象,如果池为空,则调用New
字段(如果设置了)创建一个Put(x interface{})
:将一个对象放回池中。
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 4096)
},
}
func main() {
buf := bufferPool.Get().([]byte)
// ... 使用 buf ...
bufferPool.Put(buf) // 使用完毕后放回池中
}
条件等待 Cond
用途:为更复杂的同步场景提供支持,允许 goroutine 等待或宣布某个事件或条件的发生。
注意:Cond
必须与一个 Locker
(通常是 *sync.Mutex
)关联使用。
核心方法:
Wait()
:原子地解锁其Locker
并暂停执行,等待被唤醒。被唤醒后,它会重新锁定Locker
。Signal()
:唤醒一个正在等待的 goroutine(如果有)。Broadcast()
:唤醒所有正在等待的 goroutine。
var ready bool
var mu sync.Mutex
cond := sync.NewCond(&mu)
// 等待条件的 goroutine
go func() {
mu.Lock()
for !ready {
cond.Wait() // 等待条件 ready 变为 true
}
fmt.Println("Condition met!")
mu.Unlock()
}()
// 改变条件的 goroutine
time.Sleep(time.Second)
mu.Lock()
ready = true
cond.Signal() // 或 cond.Broadcast()
mu.Unlock()