0x05 锁
Lock
多线程中, 对临界区进行加锁处理, 以避免出现竞争情况. 利用threading
库中的Lock
类创建对象来解决.
往往lock的使用需要在临界区的开始使用acquire()
方法获取锁, 在处理完毕后, 使用release()
方法释放锁. 这种形式比较麻烦, 而且如果代码中忘记释放锁, 就很容易造成死锁的问题, 且问题较难排查. 使用with
语句就不用显式地对锁进行操作, 保证同一时间, 只允许一个线程执行with
语句块中的代码.
from threading import Lock
lock = Lock()
count = 0
def simple_func():
with lock:
count += 1
RLock
RLock
被称为可重入锁, 可以被同一线程多次获取. 如果多个线程执行某个类中的一个函数, 而这个类拥有这种锁时, 只有一个线程可以使用类中的全部函数或方法.
当某个线程获取这个锁之后, 跟Lock
一样, 其他所有线程都不会获得这个锁. 但与Lock
不同的是, 在这个线程中, 在锁被释放之前, 还可以多次获取这个锁, 但是最后的释放次数要保证或获取次数一致, 才是完全释放, 其他的线程才能获取这个锁, 否则会造成死锁的现象.
Semaphore
Semaphore
称为信号量. Semaphore
对象是一种基于共享计数器的同步原语.
原理为:
如果计数器非零, 那么
with
语句或acquire()
方法会递计数器, 并且允许线程继续执行当
with
语句块结束时或调用release
方法时计数器会得到递增如果计数器为零, 那么执行过程就会被阻塞, 直到由另一个线程来递增计数器为止.
因此, Semaphore
有以下特性:
可以当做
Lock
来使用, 但其实现更复杂, 对程序的性能带来负面影响线程之间发送信号的情境中可以使用
实现节流(throttling)的应用中更加有用, 可以通过初始化
Semaphore
时指定信号量的数量, 如果想限制并发的总数, 可以用Semaphore
来处理
from threading import Semaphore
s = Semaphore(5)
def simple_func(data):
with s:
# do something
pass
死锁
如果线程中一次需要获取多把锁的情况, 就要提防死锁情况的出现.
多线程程序中, 出现死锁的原因就是线程一次尝试获取了多个锁, 在获取一部分锁之后, 获取另外的锁时阻塞了, 因为其他线程可能已经获取了这些锁, 因此其他线程也会阻塞, 没有一个线程会放开已经获取的锁, 导致所有线程都被阻塞, 程序停滞.
一种有效地解决方法为, 对程序中的每个锁分配一个唯一的编号, 在获取多个锁时, 按照编号的升序方式来获取. 利用Python中的上下文管理器来实现这个机制:
import threading
from contextlib import contextmanager
_local = threading.local()
@contextmanager
def acquire(*locks):
# sort locks by object identifier
locks = sorted(locks, key=lambda x: id(x))
# make sure lock order of previously acquired locks is not violated
acquired = getattr(_local, "acquired", [])
if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
# 已经获取到的锁的对象ID必须比将要获取到的锁的ID要小
raise RuntimeError("Lock order violation")
# acquire all of the locks
acquired.extend(locks)
_local.acquired = acquired
try:
for lock in locks:
lock.acquire()
yield
finally:
# release locks in reverse order of acquisition
for lock in reversed(locks):
lock.release()
del acquired[-len(locks):]
x_lock = threading.Lock()
y_lock = threading.Lock()
def thread_1():
while True:
with acquire(x_lock, y_lock):
print("Thread-1")
def thread_2():
while True:
with acquire(x_lock, y_lock):
print("Thread-2")
t1 = threading.Thread(target=thread_1, daemon=True)
t2 = threading.Thread(target=thread_2, daemon=True)
t1.start()
t2.start()
这里使用到了保存每个线程专有状态的local()
方法, 使用acquired
属性记住每个线程它自己已经获取到的锁的顺序. 并且保证已经获取到的锁的对象ID必须比将要获取到的锁的ID要小, 就能解决死锁的问题.
最后更新于
这有帮助吗?