同步
线程以非确定性 的方式独立执行, 线程何时开始执行, 何时被打断, 何时恢复执行, 完全由操作系统 来调度管理, 这是用户和程序都无法决定的.
如果一个线程需要判断其他的一些线程中, 是否已经执行到过程中的某个点, 根据这个判断, 来执行后续的工作, 那么就需要线程之间的同步工作来完成. 用来完成线程之间同步的对象称为同步原语 .
threading
包中提供了三种同步原语:
Event 事件: 用于一次性事件, 一旦这个事件完成了(完成了设置), 这个Event
对象就被丢弃了
Semaphore 信号量: 每次希望只唤醒一个单独的等待线程
Condition 条件: 打算一遍又一遍重复通知某个事件, 如定时器
Event
Event
对象允许线程等待某个事情发生. 初始状态, 事件对象被置为0, 如果事件没有被设置, 线程就会等待该事件, 线程就会被阻塞, 进入休眠状态, 直到事件被设置为止.
当有线程设置了这个事件时, 会唤醒所有 正在等待该事件的线程(如果有的话), 使得线程得以继续执行. 因此是一个一对多的关系.
复制 import time
from threading import Thread, Event
def count_down(n, started_evt):
print("count down starting...")
started_evt.set()
while n > 0:
print("T-minus", n)
n -= 1
time.sleep(5)
started_evt = Event()
print("launching count down...")
t = Thread(target=count_down, args=(10, started_evt))
t.start()
started_evt.wait()
print("count down is running...")
这样保证了count down is running...
总会在count down starting...
后显示.
使用方法 :
evt.set() : 设置事件, 消除其他进程中的阻塞
evt.wait() : 等待事件被设置, 产生阻塞
注意 : Event
对象最好只用于一次性事件 . 尽管可以对一个事件, 在设置了之后, 用clear()
方法来清除设置, 重新等待被设置, 但要安全地清除事件并等待它被再次设置这个过程很难同步协调. 因此一个Event
一旦完成了设置, 这个对象就应该被丢弃.
Condition
Condition
对象用在线程打算一遍又一遍地重复通知某事件的情况中, 常用在定时器 场景中, 每当定时器超时, 其他线程感知到超时时间, 进而做相应的执行.
复制 import time
import threading
class PeriodicTimer:
def __init__(self, interval):
self.interval = interval
self.flag = 0
self.cv = threading.Condition()
def start(self):
t = threading.Thread(target=self.run, daemon=True)
t.start()
def run(self):
while 1:
time.sleep(self.interval)
with self.cv:
self.flag ^= 1
self.cv.notify_all()
def wait_for_tick(self):
with self.cv:
last_flag = self.flag
while last_flag == self.flag:
self.cv.wait()
def count_down(nticks):
while nticks > 0:
ptimer.wait_for_tick()
print("T-minus", nticks)
nticks -= 1
def count_up(last):
n = 0
while n < last:
ptimer.wait_for_tick()
print("Counting", n)
n += 1
ptimer = PeriodicTimer(5)
ptimer.start()
threading.Thread(target=count_down, args=(10,)).start()
threading.Thread(target=count_up, args=(5,)).start()
使用方法 :
首先使用一个Condition
对象的方法时, 需要使用with
语句:
阻塞方法自然是.wait() . 发起通知的方法有两个:
notify_all() : 通知全部的等待线程, 所有线程继续执行
notify() : 通知一个 等待线程, 即使多个线程在等待, 也只唤醒一个线程
Semaphore
如果我们只希望唤醒一个单独的等待线程, 除了用Condition
的notify()
方法外, 还可以使用信号量Semaphore
.
复制 def worker(n, sema):
sema.acquire()
print("working", n)
sema = threading.Semaphore(0)
n_workers = 10
for n in range(n_workers):
t = threading.Thread(target=worker, args=(n, sema))
t.start()
释放信号量:
复制 >>> sema.release()
working 0
>>> sema.release()
working 1
使用方法 :
acquire() : 获取信号量, 信号量对象计数-1, 计数为0时会产生阻塞, 等待信号量的释放
release() : 释放信号量, 信号量对象计数+1
其他用途 :
Semaphore
也可以用在锁 和控制并发数量的场景中(具体应用见锁
一章).