同步原語(yǔ)?

源代碼: Lib/asyncio/locks.py


asyncio 同步原語(yǔ)被設計為與 threading 模塊的類(lèi)似,但有兩個(gè)關(guān)鍵注意事項:

  • asyncio 原語(yǔ)不是線(xiàn)程安全的,因此它們不應被用于 OS 線(xiàn)程同步 (而應當使用 threading);

  • 這些同步原語(yǔ)的方法不接受 timeout 參數;請使用 asyncio.wait_for() 函數來(lái)執行帶有超時(shí)的操作。

asyncio 具有下列基本同步原語(yǔ):


Lock?

class asyncio.Lock?

實(shí)現一個(gè)用于 asyncio 任務(wù)的互斥鎖。 非線(xiàn)程安全。

asyncio 鎖可被用來(lái)保證對共享資源的獨占訪(fǎng)問(wèn)。

使用 Lock 的推薦方式是通過(guò) async with 語(yǔ)句:

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

這等價(jià)于:

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

在 3.10 版更改: Removed the loop parameter.

coroutine acquire()?

獲取鎖。

此方法會(huì )等待直至鎖為 unlocked,將其設為 locked 并返回 True。

當有一個(gè)以上的協(xié)程在 acquire() 中被阻塞則會(huì )等待解鎖,最終只有一個(gè)協(xié)程會(huì )被執行。

鎖的獲取是 公平的: 被執行的協(xié)程將是第一個(gè)開(kāi)始等待鎖的協(xié)程。

release()?

釋放鎖。

當鎖為 locked 時(shí),將其設為 unlocked 并返回。

如果鎖為 unlocked,則會(huì )引發(fā) RuntimeError。

locked()?

如果鎖為 locked 則返回 True。

事件?

class asyncio.Event?

事件對象。 該對象不是線(xiàn)程安全的。

asyncio 事件可被用來(lái)通知多個(gè) asyncio 任務(wù)已經(jīng)有事件發(fā)生。

Event 對象會(huì )管理一個(gè)內部旗標,可通過(guò) set() 方法將其設為 true 并通過(guò) clear() 方法將其重設為 false。 wait() 方法會(huì )阻塞直至該旗標被設為 true。 該旗標初始時(shí)會(huì )被設為 false。

在 3.10 版更改: Removed the loop parameter.

示例:

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()?

等待直至事件被設置。

如果事件已被設置,則立即返回 True。 否則將阻塞直至另一個(gè)任務(wù)調用 set()。

set()?

設置事件。

所有等待事件被設置的任務(wù)將被立即喚醒。

clear()?

清空(取消設置)事件。

通過(guò) wait() 進(jìn)行等待的任務(wù)現在將會(huì )阻塞直至 set() 方法被再次調用。

is_set()?

如果事件已被設置則返回 True。

Condition?

class asyncio.Condition(lock=None)?

條件對象。 該對象不是線(xiàn)程安全的。

asyncio 條件原語(yǔ)可被任務(wù)用于等待某個(gè)事件發(fā)生,然后獲取對共享資源的獨占訪(fǎng)問(wèn)。

在本質(zhì)上,Condition 對象合并了 EventLock 的功能。 多個(gè) Condition 對象有可能共享一個(gè) Lock,這允許關(guān)注于共享資源的特定狀態(tài)的不同任務(wù)實(shí)現對共享資源的協(xié)同獨占訪(fǎng)問(wèn)。

可選的 lock 參數必須為 Lock 對象或 None。 在后一種情況下會(huì )自動(dòng)創(chuàng )建一個(gè)新的 Lock 對象。

在 3.10 版更改: Removed the loop parameter.

使用 Condition 的推薦方式是通過(guò) async with 語(yǔ)句:

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

這等價(jià)于:

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()?

獲取下層的鎖。

此方法會(huì )等待直至下層的鎖為 unlocked,將其設為 locked 并返回 returns True。

notify(n=1)?

喚醒最多 n 個(gè)正在等待此條件的任務(wù)(默認為 1 個(gè))。 如果沒(méi)有任務(wù)正在等待則此方法為空操作。

鎖必須在此方法被調用前被獲取并在隨后被快速釋放。 如果通過(guò)一個(gè) unlocked 鎖調用則會(huì )引發(fā) RuntimeError。

locked()?

如果下層的鎖已被獲取則返回 True。

notify_all()?

喚醒所有正在等待此條件的任務(wù)。

此方法的行為類(lèi)似于 notify(),但會(huì )喚醒所有正在等待的任務(wù)。

鎖必須在此方法被調用前被獲取并在隨后被快速釋放。 如果通過(guò)一個(gè) unlocked 鎖調用則會(huì )引發(fā) RuntimeError。

release()?

釋放下層的鎖。

當在未鎖定的鎖上發(fā)起調用時(shí),會(huì )引發(fā) RuntimeError。

coroutine wait()?

等待直至收到通知。

當此方法被調用時(shí)如果調用方任務(wù)未獲得鎖,則會(huì )引發(fā) RuntimeError。

這個(gè)方法會(huì )釋放下層的鎖,然后保持阻塞直到被 notify()notify_all() 調用所喚醒。 一旦被喚醒,Condition 會(huì )重新獲取它的鎖并且此方法將返回 True。

coroutine wait_for(predicate)?

等待直到目標值變?yōu)?true。

目標必須為一個(gè)可調用對象,其結果將被解讀為一個(gè)布爾值。 最終的值將為返回值。

Semaphore?

class asyncio.Semaphore(value=1)?

信號量對象。 該對象不是線(xiàn)程安全的。

信號量會(huì )管理一個(gè)內部計數器,該計數器會(huì )隨每次 acquire() 調用遞減并隨每次 release() 調用遞增。 計數器的值永遠不會(huì )降到零以下;當 acquire() 發(fā)現其值為零時(shí),它將保持阻塞直到有某個(gè)任務(wù)調用了 release()。

可選的 value 參數用來(lái)為內部計數器賦初始值 (默認值為 1)。 如果給定的值小于 0 則會(huì )引發(fā) ValueError。

在 3.10 版更改: Removed the loop parameter.

使用 Semaphore 的推薦方式是通過(guò) async with 語(yǔ)句。:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

這等價(jià)于:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
coroutine acquire()?

獲取一個(gè)信號量。

如果內部計數器的值大于零,則將其減一并立即返回 True。 如果其值為零,則會(huì )等待直到 release() 并調用并返回 True。

locked()?

如果信號量對象無(wú)法被立即獲取則返回 True。

release()?

釋放一個(gè)信號量對象,將內部計數器的值加一。 可以喚醒一個(gè)正在等待獲取信號量對象的任務(wù)。

不同于 BoundedSemaphore,Semaphore 允許執行的 release() 調用多于 acquire() 調用。

BoundedSemaphore?

class asyncio.BoundedSemaphore(value=1)?

綁定的信號量對象。 該對象不是線(xiàn)程安全的。

BoundedSemaphore 是特殊版本的 Semaphore,如果在 release() 中內部計數器值增加到初始 value 以上它將引發(fā)一個(gè) ValueError。

在 3.10 版更改: Removed the loop parameter.

Barrier?

class asyncio.Barrier(parties, action=None)?

A barrier object. Not thread-safe.

A barrier is a simple synchronization primitive that allows to block until parties number of tasks are waiting on it. Tasks can wait on the wait() method and would be blocked until the specified number of tasks end up waiting on wait(). At that point all of the waiting tasks would unblock simultaneously.

async with can be used as an alternative to awaiting on wait().

The barrier can be reused any number of times.

示例:

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Result of this example is:

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

3.11 新版功能.

coroutine wait()?

Pass the barrier. When all the tasks party to the barrier have called this function, they are all unblocked simultaneously.

When a waiting or blocked task in the barrier is cancelled, this task exits the barrier which stays in the same state. If the state of the barrier is "filling", the number of waiting task decreases by 1.

The return value is an integer in the range of 0 to parties-1, different for each task. This can be used to select a task to do some special housekeeping, e.g.:

...
async with barrier as position:
   if position == 0:
      # Only one task print this
      print('End of *draining phasis*')

This method may raise a BrokenBarrierError exception if the barrier is broken or reset while a task is waiting. It could raise a CancelledError if a task is cancelled.

coroutine reset()?

Return the barrier to the default, empty state. Any tasks waiting on it will receive the BrokenBarrierError exception.

If a barrier is broken it may be better to just leave it and create a new one.

coroutine abort()?

Put the barrier into a broken state. This causes any active or future calls to wait() to fail with the BrokenBarrierError. Use this for example if one of the taks needs to abort, to avoid infinite waiting tasks.

parties?

The number of tasks required to pass the barrier.

n_waiting?

The number of tasks currently waiting in the barrier while filling.

broken?

A boolean that is True if the barrier is in the broken state.

exception asyncio.BrokenBarrierError?

This exception, a subclass of RuntimeError, is raised when the Barrier object is reset or broken.


在 3.9 版更改: 使用 await lockyield from lock 以及/或者 with 語(yǔ)句 (with await lock, with (yield from lock)) 來(lái)獲取鎖的操作已被移除。 請改用 async with lock。