Go并发编程核心

Go 语言在设计之初就将并发作为其核心特性之一。它推崇 CSP (Communicating Sequential Processes, 通信顺序进程) 模型,其哲学是:

不要通过共享内存来通信,而要通过通信来共享内存。 (Do not communicate by sharing memory; instead, share memory by communicating.)

这使得 Go 的并发代码通常比传统基于锁和共享内存的模型更简单、更安全、也更易于推理。


协程(goroutine)和管道(channel)

协程(goroutine)

Goroutine 是 Go 并发设计的核心。它是由 Go 运行时(runtime)管理的、非常轻量级的执行线程。

核心特性:

  • 轻量级:创建一个 goroutine 的成本非常低。它们的初始栈空间只有几 KB(通常是 2KB),并且可以根据需要动态伸缩,因此可以轻松创建成千上万个 goroutine。
  • 由 Go 运行时管理:Goroutine 的调度由 Go 的调度器在内核线程之上完成,而不是直接由操作系统管理。这使得上下文切换的成本远低于操作系统的线程。
  • 启动简单:使用 go 关键字后跟一个函数调用,即可启动一个新的 goroutine。

代码示例:

go
package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 3; i++ {
        fmt.Println(s)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    // 启动一个新的 goroutine 来执行 say("World")
    go say("World")
    
    // main goroutine 继续执行 say("Hello")
    say("Hello")
}

输出(可能是交错的):

text
Hello
World
Hello
World
Hello
World

注意main 函数本身也在一个 goroutine 中运行。当 main 函数返回时,程序会立即退出,不会等待其他 goroutine 完成。为了等待,我们需要使用其他同步机制,如 channelsync.WaitGroup


Goroutine 中的 Panic 处理

一个 goroutine 中发生的 panic,如果没有被 recover,将会导致整个程序的崩溃。因此,在关键的 goroutine 中建立一个“防崩溃”机制非常重要。

最佳实践:在 goroutine 的入口处使用 deferrecover

go
func safeGo(f func()) {
    go func() {
        defer func() {
            if err := recover(); err != nil {
                log.Printf("goroutine panicked: %v", err)
            }
        }()
        f()
    }()
}

func myRiskyTask() {
    // ... 可能发生 panic 的代码 ...
    panic("something went wrong")
}

func main() {
    safeGo(myRiskyTask) // 使用我们封装的函数来安全启动
    
    // ... 程序会继续执行,不会因为 myRiskyTask 的 panic 而崩溃 ...
    time.Sleep(time.Second)
    fmt.Println("Main finished gracefully.")
}

管道(channel)

Channel 是 Go 中用于 goroutine 之间通信的管道。它可以被看作是一个类型化的、线程安全的消息队列,遵循**先进先出(FIFO)**的原则。

核心特性:

  • 类型安全:每个 channel 都有一个特定的类型,只能传递该类型的数据。
  • 通信与同步:向 channel 发送或从 channel 接收数据本身就是一种同步行为。默认情况下,发送和接收都会阻塞,直到另一方准备好。

Channel 的创建与使用:

go
// 创建一个可以传递 int 类型的无缓冲 channel
ch := make(chan int)

// 创建一个容量为 10 的、可以传递 string 类型的有缓冲 channel
bufCh := make(chan string, 10)

操作:

  • 发送数据ch <- value
  • 接收数据value := <-ch
  • 关闭管道close(ch)

无缓冲 Channel

无缓冲 channel 的容量为零。发送操作会一直阻塞,直到另一个 goroutine 对该 channel 进行接收操作。同样,接收操作也会阻塞,直到另一个 goroutine 进行发送。这是一种强同步机制。

go
package main

import "fmt"

func main() {
    ch := make(chan string)

    go func() {
        fmt.Println("Goroutine is ready to send data...")
        ch <- "Hello from goroutine!" // 发送操作,会阻塞在这里
        fmt.Println("Goroutine has sent data.")
    }()

    fmt.Println("Main is waiting to receive data...")
    message := <-ch // 接收操作,会阻塞在这里直到接收到数据
    fmt.Println("Main received:", message)
}

有缓冲 Channel

有缓冲 channel 拥有一个固定容量的队列。发送操作只有在缓冲区满时才会阻塞。接收操作只有在缓冲区空时才会阻塞。这提供了一种异步通信的能力。

go
package main

import "fmt"

func main() {
    ch := make(chan int, 2) // 容量为 2 的缓冲 channel

    ch <- 1 // 不会阻塞
    ch <- 2 // 不会阻塞

    // ch <- 3 // 再次发送会阻塞,因为缓冲区已满

    fmt.Println(<-ch) // 输出 1
    fmt.Println(<-ch) // 输出 2
}

遍历和关闭通道

关闭 Channel 与 for...range

  • close(ch):关闭一个 channel。关闭后不能再发送数据(会导致 panic),但仍可以接收已在缓冲区中的数据。
  • v, ok := <-ch:可以通过第二个返回值 ok 判断 channel 是否已关闭且无数据可读。如果 okfalse,表示 channel 已关闭。
  • for v := range ch:可以方便地持续从 channel接收数据,直到 channel 被关闭。
go
func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs { // 会自动处理 channel 关闭的情况
        fmt.Printf("Worker %d started job %d\n", id, j)
        time.Sleep(time.Second)
        fmt.Printf("Worker %d finished job %d\n", id, j)
        results <- j * 2
    }
}

sync 包:更精细的并发控制

当“通过通信共享内存”的模式不适用,或需要处理更传统的并发问题(如保护共享资源)时,sync 包提供了必要的工具。

等待组 WaitGroup

用途:等待一组 goroutine 全部执行完成。 核心方法

  • Add(delta int):计数器加 delta。通常在启动 goroutine 前调用。
  • Done():计数器减一。通常在 goroutine 的末尾使用 defer 调用。
  • Wait():阻塞,直到计数器归零。
go
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 确保在函数退出时通知 WaitGroup
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 3; i++ {
        wg.Add(1) // 启动一个 goroutine 前,增加计数器
        go worker(i, &wg)
    }

    wg.Wait() // 等待所有 worker 完成
    fmt.Println("All workers completed.")
}

互斥锁 Mutex

用途:保护临界区,保证同一时间只有一个 goroutine 可以访问共享资源。 核心方法

  • Lock():获取锁,如果锁已被占用,则阻塞。
  • Unlock():释放锁。

最佳实践:使用 defer mu.Unlock() 来确保锁一定会被释放。

go
var counter int
var mu sync.Mutex

func increment() {
    mu.Lock()
    defer mu.Unlock()
    counter++
}

读写锁 RWMutex

用途:在读多写少的场景下提升性能。它允许多个“读者”同时访问资源,但“写者”必须独占访问。

核心方法

  • RLock() / RUnlock():用于读操作的加锁和解锁。
  • Lock() / Unlock():用于写操作的加锁和解锁。
go
var config map[string]string
var mu sync.RWMutex

func readConfig(key string) string {
    mu.RLock()
    defer mu.RUnlock()
    return config[key]
}

func writeConfig(key, value string) {
    mu.Lock()
    defer mu.Unlock()
    config[key] = value
}

单次执行 Once

用途:确保某个函数在程序生命周期内只被执行一次,无论它被多少个 goroutine 调用。常用于单例模式的初始化。

核心方法Do(f func()):传入需要执行一次的函数。

go
var once sync.Once
var instance *MySingleton

func GetInstance() *MySingleton {
    once.Do(func() {
        fmt.Println("Initializing singleton...")
        instance = &MySingleton{}
    })
    return instance
}

并发安全 Map

用途:提供一个原生支持并发读写的 map。

注意sync.Map 并非旨在完全替代 map + Mutex。它在特定场景下性能更优:

  1. 当一个 key 只被写入一次,但被读取很多次。
  2. 当多个 goroutine 读写不相交的键集合时。

核心方法

  • Store(key, value interface{}):存储键值对。
  • Load(key interface{}) (value interface{}, ok bool):加载一个值。
  • Delete(key interface{}):删除一个键。
  • Range(f func(key, value interface{}) bool):遍历 map。
go
var m sync.Map

// 存储
m.Store("name", "Alice")

// 读取
if name, ok := m.Load("name"); ok {
    fmt.Println(name.(string))
}

临时对象池 Pool

用途:缓存和复用临时对象,以减轻垃圾回收(GC)的压力。它不是一个持久化的连接池。

注意:Pool 中的对象可能在任何时候被 GC 无通知地回收。

核心方法

  • Get() interface{}:从池中获取一个对象,如果池为空,则调用 New 字段(如果设置了)创建一个
  • Put(x interface{}):将一个对象放回池中。
go
var bufferPool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 4096)
    },
}

func main() {
    buf := bufferPool.Get().([]byte)
    // ... 使用 buf ...
    bufferPool.Put(buf) // 使用完毕后放回池中
}

条件等待 Cond

用途:为更复杂的同步场景提供支持,允许 goroutine 等待或宣布某个事件或条件的发生。

注意Cond 必须与一个 Locker(通常是 *sync.Mutex)关联使用。

核心方法

  • Wait():原子地解锁其 Locker 并暂停执行,等待被唤醒。被唤醒后,它会重新锁定 Locker
  • Signal():唤醒一个正在等待的 goroutine(如果有)。
  • Broadcast():唤醒所有正在等待的 goroutine。
go
var ready bool
var mu sync.Mutex
cond := sync.NewCond(&mu)

// 等待条件的 goroutine
go func() {
    mu.Lock()
    for !ready {
        cond.Wait() // 等待条件 ready 变为 true
    }
    fmt.Println("Condition met!")
    mu.Unlock()
}()

// 改变条件的 goroutine
time.Sleep(time.Second)
mu.Lock()
ready = true
cond.Signal() // 或 cond.Broadcast()
mu.Unlock()