在讨论并发模型之前,必须先区分两类任务:
- I/O 密集型 (I/O-Bound):任务的大部分时间都在等待外部资源,如等待网络响应、读取磁盘文件、查询数据库等。CPU 在此期间是空闲的。
- CPU 密集型 (CPU-Bound):任务的大部分时间都在进行计算,CPU 持续高速运转。例如,科学计算、视频编码、图像处理等。
这个区分至关重要,因为 Python 的不同并发模型对这两类任务的提速效果截然不同。
多线程 (threading)
多线程 (threading
):应对 I/O 密集型任务的经典方案。threading
模块提供了与 Java Thread
非常相似的 API。你可以创建线程,启动它们,并使用锁 (Lock
)、信号量 (Semaphore
) 等同步原语。
全局解释器锁
CPython (官方的 Python 解释器) 中有一个全局解释器锁 (Global Interpreter Lock, GIL)。这个锁规定,在任何一个时刻,一个 Python 进程中只能有一个线程在执行 Python 字节码。
- 对 CPU 密集型任务的影响:这意味着即使在多核 CPU 上,Python 的多线程也无法实现真正的并行计算。如果你启动多个线程去执行计算任务,它们会轮流获取 GIL,宏观上看是并发,但微观上仍然是串行的,总性能甚至可能因为线程切换开销而下降。
- 对 I/O 密集型任务的益处:当一个线程执行 I/O 操作(如
requests.get()
,file.read()
)时,它会释放 GIL,让其他线程有机会运行。当 I/O 操作完成后,它再重新尝试获取 GIL。这样,多个线程就可以在等待 I/O 的间隙中交替执行,从而显著提高应用的整体吞吐量。
代码示例:使用多线程下载网页
这是一个典型的 I/O 密集型场景。
import threading
import requests
import time
urls = [
'https://www.python.org',
'https://www.google.com',
'https://www.github.com',
'https://www.wikipedia.org'
]
def download(url):
try:
requests.get(url, timeout=5)
print(f"成功下载: {url}")
except requests.RequestException as e:
print(f"下载失败: {url}, 错误: {e}")
start_time = time.time()
threads = []
for url in urls:
# 类似 Java 的 new Thread(new Runnable(...))
thread = threading.Thread(target=download, args=(url,))
threads.append(thread)
thread.start()
for thread in threads:
# 类似 Java 的 thread.join()
thread.join()
end_time = time.time()
print(f"多线程耗时: {end_time - start_time:.2f} 秒")
何时使用 threading
?
- 处理 I/O 密集型任务。
- 你想利用并发,但项目依赖了大量不支持异步的旧库。
- 并发任务数量不多(几十到几百个),且逻辑相对简单。
多进程 (multiprocessing)
多进程 (multiprocessing
):突破 GIL,实现真并行
multiprocessing
模块的 API 在很大程度上模仿了 threading
,但它创建的是进程而非线程。
核心优势:绕过 GIL 每个进程都有自己独立的内存空间和独立的 Python 解释器,因此也拥有自己独立的 GIL。这使得多进程成为 Python 中唯一能够利用多核 CPU 进行并行计算的标准库方案。
代价与挑战:
- 资源消耗:创建和维护进程的开销远大于线程。
- 通信复杂:进程间内存不共享。数据交换必须通过序列化/反序列化(pickling)并借助
Queue
、Pipe
等进程间通信 (IPC) 机制,比线程间的共享内存要慢和复杂。
代码示例:使用进程池处理 CPU 密集型任务
使用 multiprocessing.Pool
(类似于 Java 的 ExecutorService
)是管理进程的现代、高效方式。
import multiprocessing
import time
def heavy_computation(n):
# 模拟一个耗时的计算任务
result = sum(i * i for i in range(n))
return result
# 确保在 Windows 等系统上正常工作
if __name__ == "__main__":
tasks = [5_000_000] * 8 # 8个大型计算任务
start_time = time.time()
# 创建一个包含4个工作进程的进程池
with multiprocessing.Pool(processes=4) as pool:
# map 会阻塞,直到所有任务完成
results = pool.map(heavy_computation, tasks)
end_time = time.time()
print(f"多进程耗时: {end_time - start_time:.2f} 秒")
# 对比单进程
start_time = time.time()
single_results = [heavy_computation(n) for n in tasks]
end_time = time.time()
print(f"单进程耗时: {end_time - start_time:.2f} 秒")
何时使用 multiprocessing
?
- 处理 CPU 密集型任务,需要充分利用多核 CPU 的计算能力。
- 对任务的隔离性要求高,不希望共享状态。
异步 IO (asyncio)
异步 I/O (asyncio
):现代 Python 并发编程的未来。这是 Python 近年来发展最迅猛的领域。它引入了协程 (Coroutine) 的概念,实现了单线程下的高并发。
核心思想:协作式多任务
- 线程 (Preemptive Multitasking):由操作系统内核强制进行线程切换,开发者无法精确控制切换时机。
- 协程 (Cooperative Multitasking):由程序自身控制任务切换。一个任务(协程)在遇到 I/O 等待时,会主动声明
await
(“我在这里要等一下”),然后将控制权交还给事件循环 (Event Loop)。事件循环会立即运行下一个已准备好的任务,而不是原地空等。
关键语法:async
和 await
async def
:用于定义一个协程函数。调用它不会立即执行,而是返回一个协程对象。await
:用于“暂停”一个协程,等待一个耗时的异步操作(通常是 I/O)完成。await
只能在async def
函数内部使用。
asyncio
的模型更接近于 Node.js 或 Java 的 Netty/Vert.x 这类事件驱动框架。它在单个线程上通过事件循环调度成千上万个轻量级的“任务”,资源开销极小。
代码示例:使用 asyncio
和 aiohttp/httpx
下载网页
标准库 requests
是阻塞的,不能与 asyncio
一起使用。我们需要异步的 HTTP 客户端,如 aiohttp
或 httpx
。
import asyncio
import httpx # httpx 是一个现代的、同时支持同步和异步的HTTP客户端
import time
urls = [
'https://www.python.org',
'https://www.google.com',
'https://www.github.com',
'https://www.wikipedia.org',
'https://www.microsoft.com',
'https://www.amazon.com',
]
async def download_async(client, url):
try:
await client.get(url, timeout=5)
print(f"成功下载 (async): {url}")
except httpx.RequestError as e:
print(f"下载失败 (async): {url}, 错误: {e}")
async def main():
# 使用 AsyncClient 来复用连接,性能更好
async with httpx.AsyncClient() as client:
# 创建所有任务的列表
tasks = [download_async(client, url) for url in urls]
# asyncio.gather 并发运行所有任务
await asyncio.gather(*tasks)
start_time = time.time()
# 运行顶层的 main 协程
asyncio.run(main())
end_time = time.time()
print(f"Asyncio 耗时: {end_time - start_time:.2f} 秒")
在这个例子中,当一个 await client.get()
开始等待网络响应时,事件循环会立即切换到另一个 download_async
任务并发出它的网络请求,所有请求几乎是“同时”发出的,极大地提高了效率。
何时使用 asyncio
?
- 高并发 I/O 密集型场景:如 Web 服务器、API 网关、爬虫、聊天应用等,需要同时处理成百上千甚至上万个网络连接。
- 当性能和资源利用率(特别是内存)是首要考虑因素时。
- 整个技术栈都是异步的(例如,使用
aiohttp
,FastAPI
,asyncpg
等)。
不同应用场景下的选择
模型 | threading |
multiprocessing |
asyncio |
---|---|---|---|
核心机制 | 抢占式多任务 (OS 调度) | 真并行 (多进程) | 协作式多任务 (事件循环) |
GIL 影响 | 受限 (无法利用多核 CPU) | 不受影响 (每个进程有自己的 GIL) | 不受影响 (单线程运行) |
最佳场景 | I/O 密集型 (中低并发) | CPU 密集型 | I/O 密集型 (高并发) |
资源开销 | 中等 | 高 | 极低 |
编程模型 | 传统,类似 Java Thread |
传统,类似 threading |
现代,async/await |
生态系统 | 兼容所有标准库 | 兼容所有标准库 | 需要专门的异步库 (如 httpx ) |
决策流程图:
-
你的任务是 CPU 密集型吗?
- 是 -> 使用
multiprocessing
。这是你唯一的选择。 - 否 (即 I/O 密集型) -> 进入下一步。
- 是 -> 使用
-
你需要处理高并发吗 (如 > 1000 个并发连接)?或者追求极致的性能和低内存占用吗?
- 是 -> 优先考虑
asyncio
。前提是你的生态系统(数据库驱动、HTTP 客户端等)支持异步。 - 否 -> 进入下一步。
- 是 -> 优先考虑
-
你的并发任务数量不多,或者需要与大量现有的阻塞式库交互吗?
- 是 -> 使用
threading
。它更简单,侵入性更小,是快速为现有项目增加并发能力的可靠选择。
- 是 -> 使用