前端 Python 3.12

threading 与 Lock

threading、GIL、Lock/RLock、队列与常见坑的速览。

threading 基于 OS 线程;心智上接近 Java/C# 的「线程 + 监视器」,语法上多用 with lock:

1 GIL:线程能并行跑 CPU 吗?

CPython GIL:同一时刻通常只有一个线程在执行 Python 字节码

  • I/O 密集:多线程常有收益(等待 I/O 时会释放 GIL)。
  • CPU 密集(纯 Python 跑满):难真正并行;考虑 multiprocessingPyPy、C 扩展或 NumPy 等释放 GIL 的路径。

并行选型:并发入门


2 Thread:启动与 join

import threading

def work(n: int) -> None:
    print(f"thread {threading.current_thread().name}: {n}")

t = threading.Thread(target=work, args=(42,), name="worker")
t.start()
t.join()  # 等待结束;join(timeout=...)
  • daemon=True:主线程退出时不等待该线程(后台任务注意资源未落盘)。
  • 异常:子线程未捕获异常只打到 stderr,不冒泡到 start();要统一处理可在 target 里捕获或用 ThreadPoolExecutor

3 Lock:互斥

共享可变状态需互斥;Lock() 非重入(同线程未 releaseacquire 会死锁)。

调度非固定轮流;锁只互斥临界区。下例锁内累加并记线程名,trace[:40] 顺序不定。

import threading

ROUNDS = 2_000
counter = 0
trace: list[str] = []
lock = threading.Lock()

def bump() -> None:
    global counter
    for _ in range(ROUNDS):
        with lock:
            trace.append(threading.current_thread().name)
            counter += 1

threads = [threading.Thread(target=bump, name=f"w{i}") for i in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

assert counter == 4 * ROUNDS
print(trace[:40])

无锁时 counter += 1 可能丢更新(< 4 * ROUNDS)。

idiom:用 with lock:(异常也会释放,等价 try/finally)。


4 RLock:可重入

外层已 with lock,内层函数又要 with 同一把 lock → 同线程第二次 acquireLock 会自等自锁;RLock 按线程计数嵌套,配对 release 后才让给其它线程。

最小对照:with lock: with lock: —— Lock 卡死,RLock 可跑完。

import threading

class GuardedCache:
    def __init__(self) -> None:
        self._lock = threading.RLock()
        self._data: dict[str, str] = {}

    def get(self, key: str) -> str | None:
        with self._lock:
            return self._data.get(key)

    def ensure(self, key: str, factory: str) -> str:
        with self._lock:
            if key not in self._data:
                self._data[key] = factory
            return self.get(key)  # 嵌套持锁:Lock 死锁,RLock 可

5 其它同步原语(速览)

类型用途
Semaphore(n)至多 n 个线程同时进临界区(池、限流)。
Eventwait() 直到 set();启停、shutdown 信号。
Condition锁 + wait/notify;生产者-消费者等(注意虚假唤醒循环判)。
Barrier(parties)parties 个到齐后一起继续。

6 queue.Queue:线程安全的管道

queue.Queue 内置同步,适合多生产者/多消费者;详情见 标准库简介 II。下例 None 作结束哨兵,task_done()join() 配对。

import queue
import threading

q: queue.Queue[int | None] = queue.Queue()

def consumer() -> None:
    while True:
        item = q.get()
        if item is None:
            q.task_done()
            break
        try:
            print(item)
        finally:
            q.task_done()


threading.Thread(target=consumer, daemon=True).start()
for i in range(3):
    q.put(i)
q.put(None)
q.join()

7 常见坑

  1. 死锁:多锁且顺序不一(AB-BA);固定顺序、缩小临界区或单把大锁。
  2. 锁内阻塞:持锁做网络/磁盘/input() 会拖住他人。
  3. 迭代中修改:遍历 list/dict 时他线程修改 → 副本、加锁或换策略。
  4. 队列选型SimpleQueue 更简单更快;asyncio 场景用 asyncio.Queue,勿长期阻塞 Queue.get()

8 threading.local

每线程独立命名空间(像「每线程一份全局变量」),常用于线程池里挂请求上下文。

import threading

ctx = threading.local()

def set_request_id(rid: str) -> None:
    ctx.request_id = rid

def log(msg: str) -> None:
    print(getattr(ctx, "request_id", "?"), msg)

9 Thread / Lock / RLock:速查

构造:Thread(...)Lock()RLock()。属性看身份/守护,方法看启动、等待与加解锁。

Thread

名称读写说明
name读写线程名。
daemon读写守护线程;须在 start() 前设。
ident / native_id只读线程 id;未 start 可能 None(native_id 3.8+)。
start()方法仅可调用一次。
join(timeout=None)方法阻塞至结束;超时配合 is_alive()
is_alive()方法start 且未结束为真。

常用构造:targetargs/kwargsnamedaemon;无 target 时可子类重写 run()

Lock / RLock(接口一致)

名称说明
acquire(...)blocking=False 立刻失败返回 Falsetimeout 秒;-1 表示一直等。
release()须持有者调用,否则 RuntimeError
locked()已被占用为真(不阻塞)。
with lock:等价 acquire/release,异常也释放。

Lock:同线程未 releaseacquire 会卡住。RLock:同线程可多次 acquirerelease 次数须对上。


资料threading — 基于线程的并行 · queue — 同步队列

On this page