并发与异步编程

在讨论并发模型之前,必须先区分两类任务:

  • I/O 密集型 (I/O-Bound):任务的大部分时间都在等待外部资源,如等待网络响应、读取磁盘文件、查询数据库等。CPU 在此期间是空闲的。
  • CPU 密集型 (CPU-Bound):任务的大部分时间都在进行计算,CPU 持续高速运转。例如,科学计算、视频编码、图像处理等。

这个区分至关重要,因为 Python 的不同并发模型对这两类任务的提速效果截然不同。


多线程 (threading)

多线程 (threading):应对 I/O 密集型任务的经典方案。threading 模块提供了与 Java Thread 非常相似的 API。你可以创建线程,启动它们,并使用锁 (Lock)、信号量 (Semaphore) 等同步原语。

全局解释器锁

CPython (官方的 Python 解释器) 中有一个全局解释器锁 (Global Interpreter Lock, GIL)。这个锁规定,在任何一个时刻,一个 Python 进程中只能有一个线程在执行 Python 字节码

  • 对 CPU 密集型任务的影响:这意味着即使在多核 CPU 上,Python 的多线程也无法实现真正的并行计算。如果你启动多个线程去执行计算任务,它们会轮流获取 GIL,宏观上看是并发,但微观上仍然是串行的,总性能甚至可能因为线程切换开销而下降。
  • 对 I/O 密集型任务的益处:当一个线程执行 I/O 操作(如 requests.get(), file.read())时,它会释放 GIL,让其他线程有机会运行。当 I/O 操作完成后,它再重新尝试获取 GIL。这样,多个线程就可以在等待 I/O 的间隙中交替执行,从而显著提高应用的整体吞吐量。

代码示例:使用多线程下载网页

这是一个典型的 I/O 密集型场景。

python
import threading
import requests
import time

urls = [
    'https://www.python.org',
    'https://www.google.com',
    'https://www.github.com',
    'https://www.wikipedia.org'
]

def download(url):
    try:
        requests.get(url, timeout=5)
        print(f"成功下载: {url}")
    except requests.RequestException as e:
        print(f"下载失败: {url}, 错误: {e}")

start_time = time.time()
threads = []
for url in urls:
    # 类似 Java 的 new Thread(new Runnable(...))
    thread = threading.Thread(target=download, args=(url,))
    threads.append(thread)
    thread.start()

for thread in threads:
    # 类似 Java 的 thread.join()
    thread.join()

end_time = time.time()
print(f"多线程耗时: {end_time - start_time:.2f} 秒")

何时使用 threading?

  1. 处理 I/O 密集型任务。
  2. 你想利用并发,但项目依赖了大量不支持异步的旧库。
  3. 并发任务数量不多(几十到几百个),且逻辑相对简单。

多进程 (multiprocessing)

多进程 (multiprocessing):突破 GIL,实现真并行

multiprocessing 模块的 API 在很大程度上模仿了 threading,但它创建的是进程而非线程。

核心优势:绕过 GIL 每个进程都有自己独立的内存空间和独立的 Python 解释器,因此也拥有自己独立的 GIL。这使得多进程成为 Python 中唯一能够利用多核 CPU 进行并行计算的标准库方案。

代价与挑战:

  • 资源消耗:创建和维护进程的开销远大于线程。
  • 通信复杂:进程间内存不共享。数据交换必须通过序列化/反序列化(pickling)并借助 QueuePipe 等进程间通信 (IPC) 机制,比线程间的共享内存要慢和复杂。

代码示例:使用进程池处理 CPU 密集型任务 使用 multiprocessing.Pool(类似于 Java 的 ExecutorService)是管理进程的现代、高效方式。

python
import multiprocessing
import time

def heavy_computation(n):
    # 模拟一个耗时的计算任务
    result = sum(i * i for i in range(n))
    return result

# 确保在 Windows 等系统上正常工作
if __name__ == "__main__":
    tasks = [5_000_000] * 8 # 8个大型计算任务

    start_time = time.time()
    # 创建一个包含4个工作进程的进程池
    with multiprocessing.Pool(processes=4) as pool:
        # map 会阻塞,直到所有任务完成
        results = pool.map(heavy_computation, tasks)
    
    end_time = time.time()
    print(f"多进程耗时: {end_time - start_time:.2f} 秒")
    
    # 对比单进程
    start_time = time.time()
    single_results = [heavy_computation(n) for n in tasks]
    end_time = time.time()
    print(f"单进程耗时: {end_time - start_time:.2f} 秒")

何时使用 multiprocessing?

  1. 处理 CPU 密集型任务,需要充分利用多核 CPU 的计算能力。
  2. 对任务的隔离性要求高,不希望共享状态。

异步 IO (asyncio)

异步 I/O (asyncio):现代 Python 并发编程的未来。这是 Python 近年来发展最迅猛的领域。它引入了协程 (Coroutine) 的概念,实现了单线程下的高并发

核心思想:协作式多任务

  • 线程 (Preemptive Multitasking):由操作系统内核强制进行线程切换,开发者无法精确控制切换时机。
  • 协程 (Cooperative Multitasking):由程序自身控制任务切换。一个任务(协程)在遇到 I/O 等待时,会主动声明 await(“我在这里要等一下”),然后将控制权交还给事件循环 (Event Loop)。事件循环会立即运行下一个已准备好的任务,而不是原地空等。

关键语法:asyncawait

  • async def:用于定义一个协程函数。调用它不会立即执行,而是返回一个协程对象。
  • await:用于“暂停”一个协程,等待一个耗时的异步操作(通常是 I/O)完成。await 只能在 async def 函数内部使用。

asyncio 的模型更接近于 Node.js 或 Java 的 Netty/Vert.x 这类事件驱动框架。它在单个线程上通过事件循环调度成千上万个轻量级的“任务”,资源开销极小。

代码示例:使用 asyncioaiohttp/httpx 下载网页

标准库 requests 是阻塞的,不能与 asyncio 一起使用。我们需要异步的 HTTP 客户端,如 aiohttphttpx

python
import asyncio
import httpx # httpx 是一个现代的、同时支持同步和异步的HTTP客户端
import time

urls = [
    'https://www.python.org',
    'https://www.google.com',
    'https://www.github.com',
    'https://www.wikipedia.org',
    'https://www.microsoft.com',
    'https://www.amazon.com',
]

async def download_async(client, url):
    try:
        await client.get(url, timeout=5)
        print(f"成功下载 (async): {url}")
    except httpx.RequestError as e:
        print(f"下载失败 (async): {url}, 错误: {e}")

async def main():
    # 使用 AsyncClient 来复用连接,性能更好
    async with httpx.AsyncClient() as client:
        # 创建所有任务的列表
        tasks = [download_async(client, url) for url in urls]
        # asyncio.gather 并发运行所有任务
        await asyncio.gather(*tasks)

start_time = time.time()
# 运行顶层的 main 协程
asyncio.run(main())
end_time = time.time()
print(f"Asyncio 耗时: {end_time - start_time:.2f} 秒")

在这个例子中,当一个 await client.get() 开始等待网络响应时,事件循环会立即切换到另一个 download_async 任务并发出它的网络请求,所有请求几乎是“同时”发出的,极大地提高了效率。

何时使用 asyncio?

  1. 高并发 I/O 密集型场景:如 Web 服务器、API 网关、爬虫、聊天应用等,需要同时处理成百上千甚至上万个网络连接。
  2. 当性能和资源利用率(特别是内存)是首要考虑因素时。
  3. 整个技术栈都是异步的(例如,使用 aiohttp, FastAPI, asyncpg 等)。

不同应用场景下的选择

模型 threading multiprocessing asyncio
核心机制 抢占式多任务 (OS 调度) 真并行 (多进程) 协作式多任务 (事件循环)
GIL 影响 受限 (无法利用多核 CPU) 不受影响 (每个进程有自己的 GIL) 不受影响 (单线程运行)
最佳场景 I/O 密集型 (中低并发) CPU 密集型 I/O 密集型 (高并发)
资源开销 中等 极低
编程模型 传统,类似 Java Thread 传统,类似 threading 现代,async/await
生态系统 兼容所有标准库 兼容所有标准库 需要专门的异步库 (如 httpx)

决策流程图:

  1. 你的任务是 CPU 密集型吗?

    • -> 使用 multiprocessing。这是你唯一的选择。
    • (即 I/O 密集型) -> 进入下一步。
  2. 你需要处理高并发吗 (如 > 1000 个并发连接)?或者追求极致的性能和低内存占用吗?

    • -> 优先考虑 asyncio。前提是你的生态系统(数据库驱动、HTTP 客户端等)支持异步。
    • -> 进入下一步。
  3. 你的并发任务数量不多,或者需要与大量现有的阻塞式库交互吗?

    • -> 使用 threading。它更简单,侵入性更小,是快速为现有项目增加并发能力的可靠选择。