threading 与 Lock
threading、GIL、Lock/RLock、队列与常见坑的速览。
threading 基于 OS 线程;心智上接近 Java/C# 的「线程 + 监视器」,语法上多用 with lock:。
1 GIL:线程能并行跑 CPU 吗?
CPython GIL:同一时刻通常只有一个线程在执行 Python 字节码。
- I/O 密集:多线程常有收益(等待 I/O 时会释放 GIL)。
- CPU 密集(纯 Python 跑满):难真正并行;考虑
multiprocessing、PyPy、C 扩展或 NumPy 等释放 GIL 的路径。
并行选型:并发入门。
2 Thread:启动与 join
import threading
def work(n: int) -> None:
print(f"thread {threading.current_thread().name}: {n}")
t = threading.Thread(target=work, args=(42,), name="worker")
t.start()
t.join() # 等待结束;join(timeout=...)daemon=True:主线程退出时不等待该线程(后台任务注意资源未落盘)。- 异常:子线程未捕获异常只打到 stderr,不冒泡到
start();要统一处理可在target里捕获或用ThreadPoolExecutor。
3 Lock:互斥
共享可变状态需互斥;Lock() 非重入(同线程未 release 再 acquire 会死锁)。
调度非固定轮流;锁只互斥临界区。下例锁内累加并记线程名,trace[:40] 顺序不定。
import threading
ROUNDS = 2_000
counter = 0
trace: list[str] = []
lock = threading.Lock()
def bump() -> None:
global counter
for _ in range(ROUNDS):
with lock:
trace.append(threading.current_thread().name)
counter += 1
threads = [threading.Thread(target=bump, name=f"w{i}") for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
assert counter == 4 * ROUNDS
print(trace[:40])无锁时 counter += 1 可能丢更新(< 4 * ROUNDS)。
idiom:用 with lock:(异常也会释放,等价 try/finally)。
4 RLock:可重入
外层已 with lock,内层函数又要 with 同一把 lock → 同线程第二次 acquire。Lock 会自等自锁;RLock 按线程计数嵌套,配对 release 后才让给其它线程。
最小对照:with lock: with lock: —— Lock 卡死,RLock 可跑完。
import threading
class GuardedCache:
def __init__(self) -> None:
self._lock = threading.RLock()
self._data: dict[str, str] = {}
def get(self, key: str) -> str | None:
with self._lock:
return self._data.get(key)
def ensure(self, key: str, factory: str) -> str:
with self._lock:
if key not in self._data:
self._data[key] = factory
return self.get(key) # 嵌套持锁:Lock 死锁,RLock 可5 其它同步原语(速览)
| 类型 | 用途 |
|---|---|
Semaphore(n) | 至多 n 个线程同时进临界区(池、限流)。 |
Event | wait() 直到 set();启停、shutdown 信号。 |
Condition | 锁 + wait/notify;生产者-消费者等(注意虚假唤醒循环判)。 |
Barrier(parties) | parties 个到齐后一起继续。 |
6 queue.Queue:线程安全的管道
queue.Queue 内置同步,适合多生产者/多消费者;详情见 标准库简介 II。下例 None 作结束哨兵,task_done() 与 join() 配对。
import queue
import threading
q: queue.Queue[int | None] = queue.Queue()
def consumer() -> None:
while True:
item = q.get()
if item is None:
q.task_done()
break
try:
print(item)
finally:
q.task_done()
threading.Thread(target=consumer, daemon=True).start()
for i in range(3):
q.put(i)
q.put(None)
q.join()7 常见坑
- 死锁:多锁且顺序不一(AB-BA);固定顺序、缩小临界区或单把大锁。
- 锁内阻塞:持锁做网络/磁盘/
input()会拖住他人。 - 迭代中修改:遍历
list/dict时他线程修改 → 副本、加锁或换策略。 - 队列选型:
SimpleQueue更简单更快;asyncio 场景用asyncio.Queue,勿长期阻塞Queue.get()。
8 threading.local
每线程独立命名空间(像「每线程一份全局变量」),常用于线程池里挂请求上下文。
import threading
ctx = threading.local()
def set_request_id(rid: str) -> None:
ctx.request_id = rid
def log(msg: str) -> None:
print(getattr(ctx, "request_id", "?"), msg)9 Thread / Lock / RLock:速查
构造:Thread(...)、Lock()、RLock()。属性看身份/守护,方法看启动、等待与加解锁。
Thread
| 名称 | 读写 | 说明 |
|---|---|---|
name | 读写 | 线程名。 |
daemon | 读写 | 守护线程;须在 start() 前设。 |
ident / native_id | 只读 | 线程 id;未 start 可能 None(native_id 3.8+)。 |
start() | 方法 | 仅可调用一次。 |
join(timeout=None) | 方法 | 阻塞至结束;超时配合 is_alive()。 |
is_alive() | 方法 | 已 start 且未结束为真。 |
常用构造:target、args/kwargs、name、daemon;无 target 时可子类重写 run()。
Lock / RLock(接口一致)
| 名称 | 说明 |
|---|---|
acquire(...) | blocking=False 立刻失败返回 False;timeout 秒;-1 表示一直等。 |
release() | 须持有者调用,否则 RuntimeError。 |
locked() | 已被占用为真(不阻塞)。 |
with lock: | 等价 acquire/release,异常也释放。 |
Lock:同线程未 release 再 acquire 会卡住。RLock:同线程可多次 acquire,release 次数须对上。