前端 Python 3.12

multiprocessing

标准库多进程:Process、启动方法、Queue/Pipe、共享内存与 Manager、Pool;绕开 GIL 做 CPU 并行;链回并发导读与官方 multiprocessing 手册。

multiprocessing 提供与 threading 相近的 API,但每个子进程拥有 独立的 Python 解释器与 GIL,适合 CPU 密集型 纯 Python 计算占满多核。需要「把任务丢给进程池、按需取结果」时,可配合 concurrent.futures.ProcessPoolExecutor;进程间传数据常用 Queue / Pipe(底层 pickle 序列化)。I/O 密集 仍常优先线程或 asyncio

适用性:在 wasm32-emscripten / wasm32-wasi不可用(见 WebAssembly 平台)。

何时用多进程

场景建议
多个 CPU 密集型 纯 Python 计算占满多核多进程或 ProcessPoolExecutor;每个进程独立 GIL
多个 I/O 密集型 任务并行(网络、磁盘、等待)多线程或 asyncio 通常更轻;多进程启动与 IPC 开销更大
需要结构化任务并发、少碰锁Pool / ProcessPoolExecutor
大量共享可变状态、频繁细粒度同步多进程 成本更高(序列化、拷贝);优先消息传递(队列)或重新设计数据流

与线程的取舍:多进程 内存隔离,一个子进程崩溃不必然拖垮整个解释器;但创建慢、对象须可 pickle,且默认 不共享 普通 Python 对象。与 内置类型线程安全 互补——跨进程仍须自行设计通信与一致性。

直接运行的 Python 脚本默认是单进程、单线程的multiprocessing 会在需要时额外 fork/spawn 子进程。

模块成员总览

模块级函数与常量

名称返回值 / 类型说明
active_children()list[Process]当前进程存活的子进程;调用时有「收割」已结束子进程的副作用
cpu_count()int系统 CPU 数量(不等于当前进程可用核数,见 os.sched_getaffinity
current_process()Process与当前进程对应的 Process 对象
parent_process()Process | None父进程对象;主进程为 None(3.8+)
get_context(method=None)Context返回带完整 API 的上下文;可指定 fork / spawn / forkserver
get_start_method(allow_none)str | None当前启动方法名
set_start_method(method, force)设置启动方法;最多一次,须在 if __name__ == '__main__' 保护下
get_all_start_methods()list[str]受支持的方法列表,首项为默认
freeze_support()Windows 冻结可执行文件(py2exe 等)所需
set_executable(path)子进程使用的 Python 解释器路径(spawn 时常用)

类型与通信 / 同步

名称说明
Process在独立 OS 进程中运行活动;API 与 threading.Thread 大体一致
Pool固定数量工作进程池;map / apply_async
Queue多生产者多消费者 FIFO;对象经 pickle 传递
SimpleQueue简化队列,类似带锁的 Pipe
JoinableQueuetask_done / join 的队列
Pipe返回一对 Connection,双向或单向
Lock / RLock进程间互斥(优先用消息传递,锁作辅助)
Semaphorethreading 同名的条件变量、事件、栅栏等
Value / Array共享内存中的标量 / 数组(ctypes typecode)
Manager服务进程托管 dict / list / 锁等,经代理访问
ProcessErrorBufferTooShortAuthenticationErrorTimeoutError

子模块:multiprocessing.shared_memory(命名共享内存)、multiprocessing.connection(底层连接)。

创建与等待进程

import multiprocessing as mp

def worker(n):
    print(f"process {n}")

if __name__ == '__main__':
    p = mp.Process(target=worker, args=(1,), name="worker-1")
    p.start()   # 同 Thread:每对象只能 start 一次
    p.join()    # 阻塞直到结束;可设 timeout,超时后用 exitcode / is_alive 判断
Process 构造参数含义
targetrun() 调用的可调用对象
args / kwargs传给 target 的位置/关键字参数
name进程名;默认 Process-N1:N2:...
daemonTrue 为守护进程;须在 start() 之前 设置;守护进程不能创建子进程
方法 / 属性(相对 Thread 额外)说明
pidOS 进程 ID;未 start 为 None
exitcode子进程退出码;None 表示未结束;0 正常;负值表示被信号终止
terminate() / kill()强制结束(finally / 清理未必执行);正在用 Queue/Pipe 时可能 损坏 IPC
close()释放资源;进程仍存活则 ValueError(3.7+)
authkey进程认证密钥(字节串)
sentinel进程结束时变为 ready 的句柄,可配合 connection.wait()

start()join()terminate()只能由创建该 Process 的进程 调用。

if __name__ == '__main__' 为何必需

在 Windows 与 macOS 默认 spawn 下,子进程会 重新 import 主模块 再执行 run()。若入口代码无保护,会无限递归创建进程。Pool、交互式 REPL 中在顶层定义的函数也常因 无法 pickle 而失败——把 worker 与 if __name__ == '__main__' 写在 可 import 的模块 里是常见写法。

from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))  # [1, 4, 9]

启动方法:spawn / fork / forkserver

方法行为概要典型默认 / 注意
spawn新 Python 解释器;只继承必要资源;但更安全Windows、macOS 默认
forkos.fork() 复制父进程;快,但 多线程进程里 fork 不安全(3.12 会 DeprecationWarning)多数 POSIX(macOS 已非默认)
forkserver单独服务器进程按需 fork;减少重复 import部分 Linux 等
import multiprocessing as mp

if __name__ == '__main__':
    mp.set_start_method('spawn')          # 全局一次
    ctx = mp.get_context('spawn')       # 或显式上下文,便于同程序混用多种方法
    q = ctx.Queue()

不同上下文创建的对象可能不兼容(例如 fork 下创建的锁不能传给 spawn 子进程)。库代码宜用 get_context(),避免覆盖用户选择。POSIX 上 spawn / forkserver 会启动 资源追踪器,清理泄漏的命名信号量、共享内存段。

进程间通信

队列 Queue

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())  # [42, None, 'hello'] — 新对象,非共享内存
    p.join()
  • 放入的对象 pickle 序列化get() 得到的是 副本
  • empty() / full() / qsize() 在多进程下 不可靠(竞态)。
  • JoinableQueue:每 get() 后须 task_done(),再 join() 等待全部完成。
  • 异常 queue.Empty / queue.Full 须从标准库 queue import(不在 multiprocessing 命名空间)。

管道 Pipe

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

duplex=False 时为单向。两端 同时 读/写同一端可能损坏数据;send/recv 对不可信对端有反序列化风险,生产环境需认证(见官方「认证密码」)。

同步与共享状态

官方建议:能消息传递就不要共享可变状态。若必须共享:

共享内存 Value('d', 0.0)Array('i', range(10)):ctypes typecode(如 'd' 双精度、'i' 整型),进程与线程安全,适合数值数组。

管理器 Manager():独立服务进程托管 dict / list / Lock / Queue 等,经代理操作;更灵活、可跨机器,但 更慢with Manager() as manager: 自动启动与关闭服务进程。

from multiprocessing import Process, Lock

def f(lock, i):
    with lock:
        print('hello', i)

if __name__ == '__main__':
    lock = Lock()
    for num in range(4):
        Process(target=f, args=(lock, num)).start()

LockConditionEventBarrier 等与 threading 同名类型行为类似,均支持 with lock: 上下文管理器。

进程池 Pool

from multiprocessing import Pool, TimeoutError
import time

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        print(pool.map(f, range(10)))           # 阻塞,有序结果
        for i in pool.imap_unordered(f, range(10)):
            print(i)                              # 迭代,顺序任意
        res = pool.apply_async(f, (20,))
        print(res.get(timeout=1))                 # 400
    # 退出 with 后池关闭,不可再用
方法说明
map / map_async像内置 map,分块交给 worker
imap / imap_unordered惰性迭代;chunksize 调大块大小
apply / apply_async单次调用;async 返回 AsyncResult
starmap参数为元组序列的 map
close / terminate / join关闭入口、强杀 worker、等待结束

池只能由创建它的进程使用AsyncResult.get(timeout=...) 超时时抛出 multiprocessing.TimeoutError

相关模块速查

模块 / API关系
threading同进程内 I/O 并发;API 镜像关系
concurrent.futuresProcessPoolExecutor 更高层任务提交
queue线程安全队列;multiprocessing.Queue 近似其进程版
pickle进程间对象序列化基础
multiprocessing.shared_memory命名共享内存块,替代部分 Value/Array 场景
subprocess启动外部程序;不等同于本模块的 Python 子进程

易错点(排错速记)

  • 省略 if __name__ == '__main__':Windows/macOS spawn 下递归起进程或 import 副作用重复执行。
  • 在 REPL / notebook 顶层用 Pool:worker 函数须可 pickle,常报 Can't get attribute 'f'
  • 把多进程当多线程用共享全局变量:子进程是拷贝/重新 import,修改不会回写父进程(除非 Value/Manager/队列)。
  • terminate 时正在用 Queue:队列可能损坏,其他进程 get 异常。
  • 子进程往 Queue put 后未消费就 join 子进程:可能死锁(缓冲区未刷完);Manager 队列或确保消费完毕。
  • 多线程 + fork:易死锁或崩溃;改用 spawnforkserver,或先起进程再在该进程内建线程。
  • 混用不同 get_context() 的锁与队列:跨上下文对象不兼容。
  • CPU 不重但进程很多:启动与 pickle 开销可能抵消收益;I/O 密集优先线程/asyncio。
  • WebAssembly:本模块不可用。

资料:multiprocessing 文档

On this page