同步原语

源代码: Lib/asyncio/locks.py


asyncio 同步原语被设计为与 threading 模块的类似,但有两个关键注意事项:

  • asyncio 原语不是线程安全的,因此它们不应被用于 OS 线程同步 (而应当使用 threading);

  • 这些同步原语的方法不接受 timeout 参数;请使用 asyncio.wait_for() 函数来执行带有超时的操作。

asyncio 具有下列基本同步原语:


Lock

class asyncio.Lock

实现一个用于 asyncio 任务的互斥锁。 非线程安全。

asyncio 锁可被用来保证对共享资源的独占访问。

使用 Lock 的推荐方式是通过 async with 语句:

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

这等价于:

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

在 3.10 版更改: Removed the loop parameter.

coroutine acquire()

获取锁。

此方法会等待直至锁为 unlocked,将其设为 locked 并返回 True

当有一个以上的协程在 acquire() 中被阻塞则会等待解锁,最终只有一个协程会被执行。

锁的获取是 公平的: 被执行的协程将是第一个开始等待锁的协程。

release()

释放锁。

当锁为 locked 时,将其设为 unlocked 并返回。

如果锁为 unlocked,则会引发 RuntimeError

locked()

如果锁为 locked 则返回 True

事件

class asyncio.Event

事件对象。 该对象不是线程安全的。

asyncio 事件可被用来通知多个 asyncio 任务已经有事件发生。

Event 对象会管理一个内部旗标,可通过 set() 方法将其设为 true 并通过 clear() 方法将其重设为 falsewait() 方法会阻塞直至该旗标被设为 true。 该旗标初始时会被设为 false

在 3.10 版更改: Removed the loop parameter.

示例:

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()

等待直至事件被设置。

如果事件已被设置,则立即返回 True。 否则将阻塞直至另一个任务调用 set()

set()

设置事件。

所有等待事件被设置的任务将被立即唤醒。

clear()

清空(取消设置)事件。

通过 wait() 进行等待的任务现在将会阻塞直至 set() 方法被再次调用。

is_set()

如果事件已被设置则返回 True

Condition

class asyncio.Condition(lock=None)

条件对象。 该对象不是线程安全的。

asyncio 条件原语可被任务用于等待某个事件发生,然后获取对共享资源的独占访问。

在本质上,Condition 对象合并了 EventLock 的功能。 多个 Condition 对象有可能共享一个 Lock,这允许关注于共享资源的特定状态的不同任务实现对共享资源的协同独占访问。

可选的 lock 参数必须为 Lock 对象或 None。 在后一种情况下会自动创建一个新的 Lock 对象。

在 3.10 版更改: Removed the loop parameter.

使用 Condition 的推荐方式是通过 async with 语句:

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

这等价于:

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

获取下层的锁。

此方法会等待直至下层的锁为 unlocked,将其设为 locked 并返回 returns True

notify(n=1)

唤醒最多 n 个正在等待此条件的任务(默认为 1 个)。 如果没有任务正在等待则此方法为空操作。

锁必须在此方法被调用前被获取并在随后被快速释放。 如果通过一个 unlocked 锁调用则会引发 RuntimeError

locked()

如果下层的锁已被获取则返回 True

notify_all()

唤醒所有正在等待此条件的任务。

此方法的行为类似于 notify(),但会唤醒所有正在等待的任务。

锁必须在此方法被调用前被获取并在随后被快速释放。 如果通过一个 unlocked 锁调用则会引发 RuntimeError

release()

释放下层的锁。

当在未锁定的锁上发起调用时,会引发 RuntimeError

coroutine wait()

等待直至收到通知。

当此方法被调用时如果调用方任务未获得锁,则会引发 RuntimeError

这个方法会释放下层的锁,然后保持阻塞直到被 notify()notify_all() 调用所唤醒。 一旦被唤醒,Condition 会重新获取它的锁并且此方法将返回 True

coroutine wait_for(predicate)

等待直到目标值变为 true

目标必须为一个可调用对象,其结果将被解读为一个布尔值。 最终的值将为返回值。

Semaphore

class asyncio.Semaphore(value=1)

信号量对象。 该对象不是线程安全的。

信号量会管理一个内部计数器,该计数器会随每次 acquire() 调用递减并随每次 release() 调用递增。 计数器的值永远不会降到零以下;当 acquire() 发现其值为零时,它将保持阻塞直到有某个任务调用了 release()

可选的 value 参数用来为内部计数器赋初始值 (默认值为 1)。 如果给定的值小于 0 则会引发 ValueError

在 3.10 版更改: Removed the loop parameter.

使用 Semaphore 的推荐方式是通过 async with 语句。:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

这等价于:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
coroutine acquire()

获取一个信号量。

如果内部计数器的值大于零,则将其减一并立即返回 True。 如果其值为零,则会等待直到 release() 并调用并返回 True

locked()

如果信号量对象无法被立即获取则返回 True

release()

释放一个信号量对象,将内部计数器的值加一。 可以唤醒一个正在等待获取信号量对象的任务。

不同于 BoundedSemaphoreSemaphore 允许执行的 release() 调用多于 acquire() 调用。

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1)

绑定的信号量对象。 该对象不是线程安全的。

BoundedSemaphore 是特殊版本的 Semaphore,如果在 release() 中内部计数器值增加到初始 value 以上它将引发一个 ValueError

在 3.10 版更改: Removed the loop parameter.

Barrier

class asyncio.Barrier(parties)

A barrier object. Not thread-safe.

A barrier is a simple synchronization primitive that allows to block until parties number of tasks are waiting on it. Tasks can wait on the wait() method and would be blocked until the specified number of tasks end up waiting on wait(). At that point all of the waiting tasks would unblock simultaneously.

async with can be used as an alternative to awaiting on wait().

The barrier can be reused any number of times.

示例:

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Result of this example is:

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

3.11 新版功能.

coroutine wait()

Pass the barrier. When all the tasks party to the barrier have called this function, they are all unblocked simultaneously.

When a waiting or blocked task in the barrier is cancelled, this task exits the barrier which stays in the same state. If the state of the barrier is “filling”, the number of waiting task decreases by 1.

The return value is an integer in the range of 0 to parties-1, different for each task. This can be used to select a task to do some special housekeeping, e.g.:

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

This method may raise a BrokenBarrierError exception if the barrier is broken or reset while a task is waiting. It could raise a CancelledError if a task is cancelled.

coroutine reset()

Return the barrier to the default, empty state. Any tasks waiting on it will receive the BrokenBarrierError exception.

If a barrier is broken it may be better to just leave it and create a new one.

coroutine abort()

Put the barrier into a broken state. This causes any active or future calls to wait() to fail with the BrokenBarrierError. Use this for example if one of the tasks needs to abort, to avoid infinite waiting tasks.

parties

The number of tasks required to pass the barrier.

n_waiting

The number of tasks currently waiting in the barrier while filling.

broken

A boolean that is True if the barrier is in the broken state.

exception asyncio.BrokenBarrierError

This exception, a subclass of RuntimeError, is raised when the Barrier object is reset or broken.


在 3.9 版更改: 使用 await lockyield from lock 以及/或者 with 语句 (with await lock, with (yield from lock)) 来获取锁的操作已被移除。 请改用 async with lock