multiprocessing
标准库多进程:Process、启动方法、Queue/Pipe、共享内存与 Manager、Pool;绕开 GIL 做 CPU 并行;链回并发导读与官方 multiprocessing 手册。
multiprocessing 提供与 threading 相近的 API,但每个子进程拥有 独立的 Python 解释器与 GIL,适合 CPU 密集型 纯 Python 计算占满多核。需要「把任务丢给进程池、按需取结果」时,可配合 concurrent.futures.ProcessPoolExecutor;进程间传数据常用 Queue / Pipe(底层 pickle 序列化)。I/O 密集 仍常优先线程或 asyncio。
适用性:在 wasm32-emscripten / wasm32-wasi 上 不可用(见 WebAssembly 平台)。
何时用多进程
| 场景 | 建议 |
|---|---|
| 多个 CPU 密集型 纯 Python 计算占满多核 | 多进程或 ProcessPoolExecutor;每个进程独立 GIL |
| 多个 I/O 密集型 任务并行(网络、磁盘、等待) | 多线程或 asyncio 通常更轻;多进程启动与 IPC 开销更大 |
| 需要结构化任务并发、少碰锁 | Pool / ProcessPoolExecutor |
| 大量共享可变状态、频繁细粒度同步 | 多进程 成本更高(序列化、拷贝);优先消息传递(队列)或重新设计数据流 |
与线程的取舍:多进程 内存隔离,一个子进程崩溃不必然拖垮整个解释器;但创建慢、对象须可 pickle,且默认 不共享 普通 Python 对象。与 内置类型线程安全 互补——跨进程仍须自行设计通信与一致性。
直接运行的 Python 脚本默认是单进程、单线程的;
multiprocessing会在需要时额外 fork/spawn 子进程。
模块成员总览
模块级函数与常量
| 名称 | 返回值 / 类型 | 说明 |
|---|---|---|
active_children() | list[Process] | 当前进程存活的子进程;调用时有「收割」已结束子进程的副作用 |
cpu_count() | int | 系统 CPU 数量(不等于当前进程可用核数,见 os.sched_getaffinity) |
current_process() | Process | 与当前进程对应的 Process 对象 |
parent_process() | Process | None | 父进程对象;主进程为 None(3.8+) |
get_context(method=None) | Context | 返回带完整 API 的上下文;可指定 fork / spawn / forkserver |
get_start_method(allow_none) | str | None | 当前启动方法名 |
set_start_method(method, force) | — | 设置启动方法;最多一次,须在 if __name__ == '__main__' 保护下 |
get_all_start_methods() | list[str] | 受支持的方法列表,首项为默认 |
freeze_support() | — | Windows 冻结可执行文件(py2exe 等)所需 |
set_executable(path) | — | 子进程使用的 Python 解释器路径(spawn 时常用) |
类型与通信 / 同步
| 名称 | 说明 |
|---|---|
Process | 在独立 OS 进程中运行活动;API 与 threading.Thread 大体一致 |
Pool | 固定数量工作进程池;map / apply_async 等 |
Queue | 多生产者多消费者 FIFO;对象经 pickle 传递 |
SimpleQueue | 简化队列,类似带锁的 Pipe |
JoinableQueue | 带 task_done / join 的队列 |
Pipe | 返回一对 Connection,双向或单向 |
Lock / RLock | 进程间互斥(优先用消息传递,锁作辅助) |
Semaphore 等 | 与 threading 同名的条件变量、事件、栅栏等 |
Value / Array | 共享内存中的标量 / 数组(ctypes typecode) |
Manager | 服务进程托管 dict / list / 锁等,经代理访问 |
ProcessError 等 | BufferTooShort、AuthenticationError、TimeoutError 等 |
子模块:multiprocessing.shared_memory(命名共享内存)、multiprocessing.connection(底层连接)。
创建与等待进程
import multiprocessing as mp
def worker(n):
print(f"process {n}")
if __name__ == '__main__':
p = mp.Process(target=worker, args=(1,), name="worker-1")
p.start() # 同 Thread:每对象只能 start 一次
p.join() # 阻塞直到结束;可设 timeout,超时后用 exitcode / is_alive 判断Process 构造参数 | 含义 |
|---|---|
target | run() 调用的可调用对象 |
args / kwargs | 传给 target 的位置/关键字参数 |
name | 进程名;默认 Process-N1:N2:... |
daemon | True 为守护进程;须在 start() 之前 设置;守护进程不能创建子进程 |
方法 / 属性(相对 Thread 额外) | 说明 |
|---|---|
pid | OS 进程 ID;未 start 为 None |
exitcode | 子进程退出码;None 表示未结束;0 正常;负值表示被信号终止 |
terminate() / kill() | 强制结束(finally / 清理未必执行);正在用 Queue/Pipe 时可能 损坏 IPC |
close() | 释放资源;进程仍存活则 ValueError(3.7+) |
authkey | 进程认证密钥(字节串) |
sentinel | 进程结束时变为 ready 的句柄,可配合 connection.wait() |
start()、join()、terminate() 等 只能由创建该 Process 的进程 调用。
if __name__ == '__main__' 为何必需
在 Windows 与 macOS 默认 spawn 下,子进程会 重新 import 主模块 再执行 run()。若入口代码无保护,会无限递归创建进程。Pool、交互式 REPL 中在顶层定义的函数也常因 无法 pickle 而失败——把 worker 与 if __name__ == '__main__' 写在 可 import 的模块 里是常见写法。
from multiprocessing import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3])) # [1, 4, 9]启动方法:spawn / fork / forkserver
| 方法 | 行为概要 | 典型默认 / 注意 |
|---|---|---|
spawn | 新 Python 解释器;只继承必要资源;慢但更安全 | Windows、macOS 默认 |
fork | os.fork() 复制父进程;快,但 多线程进程里 fork 不安全(3.12 会 DeprecationWarning) | 多数 POSIX(macOS 已非默认) |
forkserver | 单独服务器进程按需 fork;减少重复 import | 部分 Linux 等 |
import multiprocessing as mp
if __name__ == '__main__':
mp.set_start_method('spawn') # 全局一次
ctx = mp.get_context('spawn') # 或显式上下文,便于同程序混用多种方法
q = ctx.Queue()不同上下文创建的对象可能不兼容(例如 fork 下创建的锁不能传给 spawn 子进程)。库代码宜用 get_context(),避免覆盖用户选择。POSIX 上 spawn / forkserver 会启动 资源追踪器,清理泄漏的命名信号量、共享内存段。
进程间通信
队列 Queue
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # [42, None, 'hello'] — 新对象,非共享内存
p.join()- 放入的对象 pickle 序列化;
get()得到的是 副本。 empty()/full()/qsize()在多进程下 不可靠(竞态)。JoinableQueue:每get()后须task_done(),再join()等待全部完成。- 异常
queue.Empty/queue.Full须从标准库queueimport(不在multiprocessing命名空间)。
管道 Pipe
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()duplex=False 时为单向。两端 同时 读/写同一端可能损坏数据;send/recv 对不可信对端有反序列化风险,生产环境需认证(见官方「认证密码」)。
同步与共享状态
官方建议:能消息传递就不要共享可变状态。若必须共享:
共享内存 Value('d', 0.0)、Array('i', range(10)):ctypes typecode(如 'd' 双精度、'i' 整型),进程与线程安全,适合数值数组。
管理器 Manager():独立服务进程托管 dict / list / Lock / Queue 等,经代理操作;更灵活、可跨机器,但 更慢。with Manager() as manager: 自动启动与关闭服务进程。
from multiprocessing import Process, Lock
def f(lock, i):
with lock:
print('hello', i)
if __name__ == '__main__':
lock = Lock()
for num in range(4):
Process(target=f, args=(lock, num)).start()Lock、Condition、Event、Barrier 等与 threading 同名类型行为类似,均支持 with lock: 上下文管理器。
进程池 Pool
from multiprocessing import Pool, TimeoutError
import time
def f(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool:
print(pool.map(f, range(10))) # 阻塞,有序结果
for i in pool.imap_unordered(f, range(10)):
print(i) # 迭代,顺序任意
res = pool.apply_async(f, (20,))
print(res.get(timeout=1)) # 400
# 退出 with 后池关闭,不可再用| 方法 | 说明 |
|---|---|
map / map_async | 像内置 map,分块交给 worker |
imap / imap_unordered | 惰性迭代;chunksize 调大块大小 |
apply / apply_async | 单次调用;async 返回 AsyncResult |
starmap | 参数为元组序列的 map |
close / terminate / join | 关闭入口、强杀 worker、等待结束 |
池只能由创建它的进程使用。AsyncResult.get(timeout=...) 超时时抛出 multiprocessing.TimeoutError。
相关模块速查
| 模块 / API | 关系 |
|---|---|
threading | 同进程内 I/O 并发;API 镜像关系 |
concurrent.futures | ProcessPoolExecutor 更高层任务提交 |
queue | 线程安全队列;multiprocessing.Queue 近似其进程版 |
pickle | 进程间对象序列化基础 |
multiprocessing.shared_memory | 命名共享内存块,替代部分 Value/Array 场景 |
subprocess | 启动外部程序;不等同于本模块的 Python 子进程 |
易错点(排错速记)
- 省略
if __name__ == '__main__':Windows/macOSspawn下递归起进程或 import 副作用重复执行。 - 在 REPL / notebook 顶层用
Pool:worker 函数须可 pickle,常报Can't get attribute 'f'。 - 把多进程当多线程用共享全局变量:子进程是拷贝/重新 import,修改不会回写父进程(除非
Value/Manager/队列)。 terminate时正在用Queue:队列可能损坏,其他进程get异常。- 子进程往
Queueput 后未消费就join子进程:可能死锁(缓冲区未刷完);Manager队列或确保消费完毕。 - 多线程 +
fork:易死锁或崩溃;改用spawn或forkserver,或先起进程再在该进程内建线程。 - 混用不同
get_context()的锁与队列:跨上下文对象不兼容。 - CPU 不重但进程很多:启动与 pickle 开销可能抵消收益;I/O 密集优先线程/asyncio。
- WebAssembly:本模块不可用。