前端 Python 3.12

asyncio

标准库协程与异步 I/O:async/await、事件循环、Task/TaskGroup、gather、同步原语与 Queue;单线程 I/O 并发;链回并发导读与官方 asyncio 手册。

asyncioasync/await 编写 单线程、协作式 的并发:在等待 I/O 时让出控制权,由 事件循环 调度其他协程。适合 I/O 密集、结构化 的网络服务、客户端与任务编排。同进程内需要 OS 线程时见 threadingCPU 密集 纯 Python 计算见 multiprocessing;「提交任务、取 Future」可用 concurrent.futuresThreadPoolExecutor / ProcessPoolExecutor(常与 asynciorun_in_executor 配合)。

适用性:在 wasm32-emscripten / wasm32-wasi不可用(见 WebAssembly 平台)。

何时用协程

场景建议
大量 I/O 等待(HTTP、WebSocket、DB、磁盘)asyncio 或基于它的框架(aiohttp、FastAPI 等);单线程内成千上万连接
需要 共享内存状态、少序列化同进程单线程协程比多进程更轻;注意仍须避免在协程间无保护地改共享对象
多个 CPU 密集型 纯 Python 计算占满多核asyncio 不能 并行跑满多核;用 multiprocessingrun_in_executor + 进程池
库全是 同步阻塞 API、无法改写线程 / 进程池包装,或换异步库;在协程里直接 time.sleep、阻塞 socket卡住整个事件循环
已有 回调式 异步库Futureawait 桥接(见官方 futures 文档)

协程函数 vs 协程对象async def foo() 定义的是 协程函数;调用 foo() 得到 协程对象。仅创建对象 不会执行,须 await 或交给 create_task / asyncio.run 调度。

直接运行的 Python 脚本默认是单进程、单线程的asyncio.run(main()) 会创建事件循环并在结束时关闭。

模块成员总览

asyncio高层 API(应用编写)与 低层 API(库作者操作事件循环)。下文侧重常用高层入口。

运行与调度

名称说明
run(coro, ...)运行顶层协程直到结束;创建并关闭事件循环(3.7+ 推荐入口)
create_task(coro, ...)将协程包装为 Task 并调度;须 保持引用 以免被 GC 取消
TaskGroup结构化并发:退出 async with 时等待组内任务;失败则取消其余(3.11+)
gather(*aws, ...)并发等待多个可等待对象,返回结果列表
wait(aws, ...)等待至满足 return_when 条件
wait_for(aw, timeout)带超时的 await;超时取消 aw 并抛 TimeoutError
timeout(delay)上下文管理器形式的超时(3.11+)
shield(aw)防止外层取消传播到 aw(外层仍可能收到 CancelledError
sleep(delay, result=None)异步休眠,让出事件循环
as_completed(aws, ...)按完成顺序迭代结果
current_task() / all_tasks()当前 / 循环内任务
get_running_loop()当前运行中的事件循环

同步与队列(协程专用,非线程安全)

名称说明
Lock / Eventthreading 同名语义,用 async with
Conditionwait / notify;配合 wait_for(predicate)
Semaphore限流、有界信号量、栅栏 Barrier(3.11+)
Queue协程间 FIFO;put / get 为异步

子模块(按需深入)

子模块 / 主题说明
asyncio.streams高层流式客户端/服务端 StreamReader / Writer
asyncio.subprocess异步子进程
asyncio.Task任务状态、cancel()done()result()
事件循环策略WindowsSelectorEventLoopPolicy 等(平台相关)

可等待对象与基本写法

三类主要 可等待(awaitable)对象:协程TaskFuture

import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

asyncio.run(main())
写法行为
await coro()顺序等待,总耗时约为各段之和
create_task(coro())立即调度,与其它协程 并发;再 await task 取结果
async with TaskGroup()组内任务并发;任一失败(非 CancelledError)会取消同组其余
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")
    async with asyncio.TaskGroup() as tg:
        tg.create_task(say_after(1, 'hello'))
        tg.create_task(say_after(2, 'world'))
    print(f"finished at {time.strftime('%X')}")  # 约 2 秒,非 3 秒

asyncio.run(main())

3.11 之前无 TaskGroup 时,常用 task1 = create_task(...); task2 = create_task(...); await task1; await task2

并发等待:gather 与超时

import asyncio

async def factorial(name, n):
    f = 1
    for i in range(2, n + 1):
        await asyncio.sleep(0.1)
        f *= i
    return f

async def main():
    results = await asyncio.gather(
        factorial('A', 3),
        factorial('B', 4),
    )
    print(results)

asyncio.run(main())
API要点
gather(..., return_exceptions=False)默认首个异常立即抛出;True 时异常也进结果列表
wait_for(aw, timeout)超时取消 aw;适合单次操作
async with asyncio.timeout(10):块内超时转为 TimeoutError(须在 with 外 except TimeoutError)(3.11+)

gather 被取消时,默认会取消尚未完成的子任务;TaskGroup 在失败传播与嵌套取消上语义更 结构化,新代码可优先 TaskGroup

任务取消与后台任务

  • task.cancel() 会在协程内引发 asyncio.CancelledError(继承 BaseException);清理逻辑放 try/finally
  • 不要 随意吞掉 CancelledError 或调用 uncancel(),否则 TaskGroup / timeout() 可能行为异常。
  • 发射后不管 的后台任务:必须保存强引用,否则可能被 GC:
background_tasks = set()

def fire_and_forget(coro):
    task = asyncio.create_task(coro)
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)

异步同步原语

asyncioLockEvent 等与 threading 不能混用:它们 不是线程安全的,只用于 同一事件循环内 的协程协调。原语方法 没有 timeout 参数,用 asyncio.wait_for() 包一层。

import asyncio

async def worker(lock, i):
    async with lock:
        print('worker', i)

async def main():
    lock = asyncio.Lock()
    async with asyncio.TaskGroup() as tg:
        for i in range(4):
            tg.create_task(worker(lock, i))

asyncio.run(main())

Condition 须用 whilewait_for(predicate) 检查条件(虚假唤醒)。Queueawait q.put(item) / await q.get(),适合协程间生产者-消费者。

与阻塞代码协作

在协程中 禁止 长时间阻塞调用(time.sleep、同步 requests.get、阻塞式 threading.Lock 等)——会冻结整个循环。

方式说明
await asyncio.sleep(n)替代 time.sleep
await asyncio.to_thread(func, *args)在线程池跑同步函数(3.9+)
loop.run_in_executor(executor, func, ...)自定义 ThreadPoolExecutor / ProcessPoolExecutor
原生异步库aiofileshttpx 异步客户端等

CPU 密集若放进 to_thread / 线程池,仍受 GIL 限制;真要多核用进程池。

网络与子进程(入口)

  • asyncio.open_connection / start_server 返回 StreamReader/StreamWriter,读写用 await reader.read()writer.write() + drain()
  • 子进程create_subprocess_exec / create_subprocess_shell(见 asyncio.subprocess)。
  • REPLpython -m asyncio 可直接 await(3.12+ 审计事件),便于试验。

应用开发通常 asyncio.run + 高层 API;自定义事件循环、信号处理、跨线程 call_soon_threadsafe 见官方「事件循环」与「开发」章节。

相关模块速查

模块 / API关系
threadingOS 线程;阻塞库可放线程池,勿阻塞循环
multiprocessing多核 CPU;与 asyncio 进程池执行器配合
queue线程队列;协程用 asyncio.Queue
selectors / select低层 I/O 多路复用(asyncio 内部依赖)
contextvars协程间上下文变量(create_task 可传 context

易错点(排错速记)

  • 只调用 main()run / 不 await:得到协程对象,代码不执行。
  • 协程里 time.sleep / 同步 HTTP / 阻塞锁:卡住事件循环,表现为「全站卡死」。
  • create_task 不保存引用:后台任务可能被 GC,中途消失。
  • asyncio.Lock 当线程锁:多线程同时 acquire 不安全。
  • gather 里异常未处理:默认一个失败即抛,其它任务可能仍在跑。
  • TaskGroup 内吞 CancelledError:结构化取消链断裂。
  • CPU 密集全写 async:算力仍单核;应 offload 到进程池或 C 扩展。
  • threading / multiprocessing 混用同一锁:语义不同,不可互换。
  • WebAssembly:本模块不可用。

资料:asyncio 文档 · 协程与任务 · 同步原语

On this page