asyncio
标准库协程与异步 I/O:async/await、事件循环、Task/TaskGroup、gather、同步原语与 Queue;单线程 I/O 并发;链回并发导读与官方 asyncio 手册。
asyncio 用 async/await 编写 单线程、协作式 的并发:在等待 I/O 时让出控制权,由 事件循环 调度其他协程。适合 I/O 密集、结构化 的网络服务、客户端与任务编排。同进程内需要 OS 线程时见 threading;CPU 密集 纯 Python 计算见 multiprocessing;「提交任务、取 Future」可用 concurrent.futures 的 ThreadPoolExecutor / ProcessPoolExecutor(常与 asyncio 的 run_in_executor 配合)。
适用性:在 wasm32-emscripten / wasm32-wasi 上 不可用(见 WebAssembly 平台)。
何时用协程
| 场景 | 建议 |
|---|---|
| 大量 I/O 等待(HTTP、WebSocket、DB、磁盘) | asyncio 或基于它的框架(aiohttp、FastAPI 等);单线程内成千上万连接 |
| 需要 共享内存状态、少序列化 | 同进程单线程协程比多进程更轻;注意仍须避免在协程间无保护地改共享对象 |
| 多个 CPU 密集型 纯 Python 计算占满多核 | asyncio 不能 并行跑满多核;用 multiprocessing 或 run_in_executor + 进程池 |
| 库全是 同步阻塞 API、无法改写 | 线程 / 进程池包装,或换异步库;在协程里直接 time.sleep、阻塞 socket 会 卡住整个事件循环 |
| 已有 回调式 异步库 | Future 与 await 桥接(见官方 futures 文档) |
协程函数 vs 协程对象:
async def foo()定义的是 协程函数;调用foo()得到 协程对象。仅创建对象 不会执行,须await或交给create_task/asyncio.run调度。直接运行的 Python 脚本默认是单进程、单线程的;
asyncio.run(main())会创建事件循环并在结束时关闭。
模块成员总览
asyncio 分 高层 API(应用编写)与 低层 API(库作者操作事件循环)。下文侧重常用高层入口。
运行与调度
| 名称 | 说明 |
|---|---|
run(coro, ...) | 运行顶层协程直到结束;创建并关闭事件循环(3.7+ 推荐入口) |
create_task(coro, ...) | 将协程包装为 Task 并调度;须 保持引用 以免被 GC 取消 |
TaskGroup | 结构化并发:退出 async with 时等待组内任务;失败则取消其余(3.11+) |
gather(*aws, ...) | 并发等待多个可等待对象,返回结果列表 |
wait(aws, ...) | 等待至满足 return_when 条件 |
wait_for(aw, timeout) | 带超时的 await;超时取消 aw 并抛 TimeoutError |
timeout(delay) | 上下文管理器形式的超时(3.11+) |
shield(aw) | 防止外层取消传播到 aw(外层仍可能收到 CancelledError) |
sleep(delay, result=None) | 异步休眠,让出事件循环 |
as_completed(aws, ...) | 按完成顺序迭代结果 |
current_task() / all_tasks() | 当前 / 循环内任务 |
get_running_loop() | 当前运行中的事件循环 |
同步与队列(协程专用,非线程安全)
| 名称 | 说明 |
|---|---|
Lock / Event | 与 threading 同名语义,用 async with |
Condition | wait / notify;配合 wait_for(predicate) |
Semaphore 等 | 限流、有界信号量、栅栏 Barrier(3.11+) |
Queue | 协程间 FIFO;put / get 为异步 |
子模块(按需深入)
| 子模块 / 主题 | 说明 |
|---|---|
asyncio.streams | 高层流式客户端/服务端 StreamReader / Writer |
asyncio.subprocess | 异步子进程 |
asyncio.Task | 任务状态、cancel()、done()、result() |
| 事件循环策略 | WindowsSelectorEventLoopPolicy 等(平台相关) |
可等待对象与基本写法
三类主要 可等待(awaitable)对象:协程、Task、Future。
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())| 写法 | 行为 |
|---|---|
await coro() | 顺序等待,总耗时约为各段之和 |
create_task(coro()) | 立即调度,与其它协程 并发;再 await task 取结果 |
async with TaskGroup() | 组内任务并发;任一失败(非 CancelledError)会取消同组其余 |
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
async with asyncio.TaskGroup() as tg:
tg.create_task(say_after(1, 'hello'))
tg.create_task(say_after(2, 'world'))
print(f"finished at {time.strftime('%X')}") # 约 2 秒,非 3 秒
asyncio.run(main())3.11 之前无 TaskGroup 时,常用 task1 = create_task(...); task2 = create_task(...); await task1; await task2。
并发等待:gather 与超时
import asyncio
async def factorial(name, n):
f = 1
for i in range(2, n + 1):
await asyncio.sleep(0.1)
f *= i
return f
async def main():
results = await asyncio.gather(
factorial('A', 3),
factorial('B', 4),
)
print(results)
asyncio.run(main())| API | 要点 |
|---|---|
gather(..., return_exceptions=False) | 默认首个异常立即抛出;True 时异常也进结果列表 |
wait_for(aw, timeout) | 超时取消 aw;适合单次操作 |
async with asyncio.timeout(10): | 块内超时转为 TimeoutError(须在 with 外 except TimeoutError)(3.11+) |
gather 被取消时,默认会取消尚未完成的子任务;TaskGroup 在失败传播与嵌套取消上语义更 结构化,新代码可优先 TaskGroup。
任务取消与后台任务
task.cancel()会在协程内引发asyncio.CancelledError(继承BaseException);清理逻辑放try/finally。- 不要 随意吞掉
CancelledError或调用uncancel(),否则TaskGroup/timeout()可能行为异常。 - 发射后不管 的后台任务:必须保存强引用,否则可能被 GC:
background_tasks = set()
def fire_and_forget(coro):
task = asyncio.create_task(coro)
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)异步同步原语
asyncio 的 Lock、Event 等与 threading 不能混用:它们 不是线程安全的,只用于 同一事件循环内 的协程协调。原语方法 没有 timeout 参数,用 asyncio.wait_for() 包一层。
import asyncio
async def worker(lock, i):
async with lock:
print('worker', i)
async def main():
lock = asyncio.Lock()
async with asyncio.TaskGroup() as tg:
for i in range(4):
tg.create_task(worker(lock, i))
asyncio.run(main())Condition 须用 while 或 wait_for(predicate) 检查条件(虚假唤醒)。Queue:await q.put(item) / await q.get(),适合协程间生产者-消费者。
与阻塞代码协作
在协程中 禁止 长时间阻塞调用(time.sleep、同步 requests.get、阻塞式 threading.Lock 等)——会冻结整个循环。
| 方式 | 说明 |
|---|---|
await asyncio.sleep(n) | 替代 time.sleep |
await asyncio.to_thread(func, *args) | 在线程池跑同步函数(3.9+) |
loop.run_in_executor(executor, func, ...) | 自定义 ThreadPoolExecutor / ProcessPoolExecutor |
| 原生异步库 | aiofiles、httpx 异步客户端等 |
CPU 密集若放进 to_thread / 线程池,仍受 GIL 限制;真要多核用进程池。
网络与子进程(入口)
- 流:
asyncio.open_connection/start_server返回StreamReader/StreamWriter,读写用await reader.read()、writer.write()+drain()。 - 子进程:
create_subprocess_exec/create_subprocess_shell(见asyncio.subprocess)。 - REPL:
python -m asyncio可直接await(3.12+ 审计事件),便于试验。
应用开发通常 只 用 asyncio.run + 高层 API;自定义事件循环、信号处理、跨线程 call_soon_threadsafe 见官方「事件循环」与「开发」章节。
相关模块速查
| 模块 / API | 关系 |
|---|---|
threading | OS 线程;阻塞库可放线程池,勿阻塞循环 |
multiprocessing | 多核 CPU;与 asyncio 进程池执行器配合 |
queue | 线程队列;协程用 asyncio.Queue |
selectors / select | 低层 I/O 多路复用(asyncio 内部依赖) |
contextvars | 协程间上下文变量(create_task 可传 context) |
易错点(排错速记)
- 只调用
main()不run/ 不await:得到协程对象,代码不执行。 - 协程里
time.sleep/ 同步 HTTP / 阻塞锁:卡住事件循环,表现为「全站卡死」。 create_task不保存引用:后台任务可能被 GC,中途消失。- 把
asyncio.Lock当线程锁:多线程同时acquire不安全。 gather里异常未处理:默认一个失败即抛,其它任务可能仍在跑。- 在
TaskGroup内吞CancelledError:结构化取消链断裂。 - CPU 密集全写 async:算力仍单核;应 offload 到进程池或 C 扩展。
- 与
threading/multiprocessing混用同一锁:语义不同,不可互换。 - WebAssembly:本模块不可用。
资料:asyncio 文档 · 协程与任务 · 同步原语