協(xié)程與任務(wù)?

本節將簡(jiǎn)述用于協(xié)程與任務(wù)的高層級 API。

協(xié)程?

Coroutines declared with the async/await syntax is the preferred way of writing asyncio applications. For example, the following snippet of code prints "hello", waits 1 second, and then prints "world":

>>>
>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

注意:簡(jiǎn)單地調用一個(gè)協(xié)程并不會(huì )使其被調度執行

>>>
>>> main()
<coroutine object main at 0x1053bb7c8>

要真正運行一個(gè)協(xié)程,asyncio 提供了三種主要機制:

  • asyncio.run() 函數用來(lái)運行最高層級的入口點(diǎn) "main()" 函數 (參見(jiàn)上面的示例。)

  • 等待一個(gè)協(xié)程。以下代碼段會(huì )在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world":

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    預期的輸出:

    started at 17:13:52
    hello
    world
    finished at 17:13:55
    
  • asyncio.create_task() 函數用來(lái)并發(fā)運行作為 asyncio 任務(wù) 的多個(gè)協(xié)程。

    讓我們修改以上示例,并發(fā) 運行兩個(gè) say_after 協(xié)程:

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

    注意,預期的輸出顯示代碼段的運行時(shí)間比之前快了 1 秒:

    started at 17:14:32
    hello
    world
    finished at 17:14:34
    

可等待對象?

如果一個(gè)對象可以在 await 語(yǔ)句中使用,那么它就是 可等待 對象。許多 asyncio API 都被設計為接受可等待對象。

可等待 對象有三種主要類(lèi)型: 協(xié)程, 任務(wù)Future.

協(xié)程

Python 協(xié)程屬于 可等待 對象,因此可以在其他協(xié)程中被等待:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要

在本文檔中 "協(xié)程" 可用來(lái)表示兩個(gè)緊密關(guān)聯(lián)的概念:

  • 協(xié)程函數: 定義形式為 async def 的函數;

  • 協(xié)程對象: 調用 協(xié)程函數 所返回的對象。

任務(wù)

任務(wù) 被用來(lái)“并行的”調度協(xié)程

當一個(gè)協(xié)程通過(guò) asyncio.create_task() 等函數被封裝為一個(gè) 任務(wù),該協(xié)程會(huì )被自動(dòng)調度執行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Futures

Future 是一種特殊的 低層級 可等待對象,表示一個(gè)異步操作的 最終結果。

當一個(gè) Future 對象 被等待,這意味著(zhù)協(xié)程將保持等待直到該 Future 對象在其他地方操作完畢。

在 asyncio 中需要 Future 對象以便允許通過(guò) async/await 使用基于回調的代碼。

通常情況下 沒(méi)有必要 在應用層級的代碼中創(chuàng )建 Future 對象。

Future 對象有時(shí)會(huì )由庫和某些 asyncio API 暴露給用戶(hù),用作可等待對象:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

一個(gè)很好的返回對象的低層級函數的示例是 loop.run_in_executor()。

創(chuàng )建任務(wù)?

asyncio.create_task(coro, *, name=None, context=None)?

coro 協(xié)程 封裝為一個(gè) Task 并調度其執行。返回 Task 對象。

name 不為 None,它將使用 Task.set_name() 來(lái)設為任務(wù)的名稱(chēng)。

An optional keyword-only context argument allows specifying a custom contextvars.Context for the coro to run in. The current context copy is created when no context is provided.

該任務(wù)會(huì )在 get_running_loop() 返回的循環(huán)中執行,如果當前線(xiàn)程沒(méi)有在運行的循環(huán)則會(huì )引發(fā) RuntimeError。

重要

Save a reference to the result of this function, to avoid a task disappearing mid execution.

3.7 新版功能.

在 3.8 版更改: Added the name parameter.

在 3.11 版更改: Added the context parameter.

休眠?

coroutine asyncio.sleep(delay, result=None)?

阻塞 delay 指定的秒數。

如果指定了 result,則當協(xié)程完成時(shí)將其返回給調用者。

sleep() 總是會(huì )掛起當前任務(wù),以允許其他任務(wù)運行。

將 delay 設為 0 將提供一個(gè)經(jīng)優(yōu)化的路徑以允許其他任務(wù)運行。 這可供長(cháng)期間運行的函數使用以避免在函數調用的全過(guò)程中阻塞事件循環(huán)。

以下協(xié)程示例運行 5 秒,每秒顯示一次當前日期:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

在 3.10 版更改: Removed the loop parameter.

并發(fā)運行任務(wù)?

awaitable asyncio.gather(*aws, return_exceptions=False)?

并發(fā) 運行 aws 序列中的 可等待對象。

如果 aws 中的某個(gè)可等待對象為協(xié)程,它將自動(dòng)被作為一個(gè)任務(wù)調度。

如果所有可等待對象都成功完成,結果將是一個(gè)由所有返回值聚合而成的列表。結果值的順序與 aws 中可等待對象的順序一致。

如果 return_exceptionsFalse (默認),所引發(fā)的首個(gè)異常會(huì )立即傳播給等待 gather() 的任務(wù)。aws 序列中的其他可等待對象 不會(huì )被取消 并將繼續運行。

如果 return_exceptionsTrue,異常會(huì )和成功的結果一樣處理,并聚合至結果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待對象也會(huì ) 被取消。

如果 aws 序列中的任一 Task 或 Future 對象 被取消,它將被當作引發(fā)了 CancelledError 一樣處理 -- 在此情況下 gather() 調用 不會(huì ) 被取消。這是為了防止一個(gè)已提交的 Task/Future 被取消導致其他 Tasks/Future 也被取消。

在 3.10 版更改: Removed the loop parameter.

示例:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

備注

如果 return_exceptions 為 False,則在 gather() 被標記為已完成后取消它將不會(huì )取消任何已提交的可等待對象。 例如,在將一個(gè)異常傳播給調用者之后,gather 可被標記為已完成,因此,在從 gather 捕獲一個(gè)(由可等待對象所引發(fā)的)異常之后調用 gather.cancel() 將不會(huì )取消任何其他可等待對象。

在 3.7 版更改: 如果 gather 本身被取消,則無(wú)論 return_exceptions 取值為何,消息都會(huì )被傳播。

在 3.10 版更改: Removed the loop parameter.

3.10 版后已移除: 如果未提供位置參數或者并非所有位置參數均為 Future 類(lèi)對象并且沒(méi)有正在運行的事件循環(huán)則會(huì )發(fā)出棄用警告。

屏蔽取消操作?

awaitable asyncio.shield(aw)?

保護一個(gè) 可等待對象 防止其被 取消。

如果 aw 是一個(gè)協(xié)程,它將自動(dòng)被作為任務(wù)調度。

以下語(yǔ)句:

res = await shield(something())

相當于:

res = await something()

不同之處 在于如果包含它的協(xié)程被取消,在 something() 中運行的任務(wù)不會(huì )被取消。從 something() 的角度看來(lái),取消操作并沒(méi)有發(fā)生。然而其調用者已被取消,因此 "await" 表達式仍然會(huì )引發(fā) CancelledError。

如果通過(guò)其他方式取消 something() (例如在其內部操作) 則 shield() 也會(huì )取消。

如果希望完全忽略取消操作 (不推薦) 則 shield() 函數需要配合一個(gè) try/except 代碼段,如下所示:

try:
    res = await shield(something())
except CancelledError:
    res = None

在 3.10 版更改: Removed the loop parameter.

3.10 版后已移除: 如果 aw 不是 Future 類(lèi)對象并且沒(méi)有正在運行的事件循環(huán)則會(huì )發(fā)出棄用警告。

超時(shí)?

coroutine asyncio.wait_for(aw, timeout)?

等待 aw 可等待對象 完成,指定 timeout 秒數后超時(shí)。

如果 aw 是一個(gè)協(xié)程,它將自動(dòng)被作為任務(wù)調度。

timeout 可以為 None,也可以為 float 或 int 型數值表示的等待秒數。如果 timeoutNone,則等待直到完成。

If a timeout occurs, it cancels the task and raises TimeoutError.

要避免任務(wù) 取消,可以加上 shield()。

此函數將等待直到 Future 確實(shí)被取消,所以總等待時(shí)間可能超過(guò) timeout。 如果在取消期間發(fā)生了異常,異常將會(huì )被傳播。

如果等待被取消,則 aw 指定的對象也會(huì )被取消。

在 3.10 版更改: Removed the loop parameter.

示例:

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

在 3.7 版更改: When aw is cancelled due to a timeout, wait_for waits for aw to be cancelled. Previously, it raised TimeoutError immediately.

在 3.10 版更改: Removed the loop parameter.

簡(jiǎn)單等待?

coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)?

Run Future and Task instances in the aws iterable concurrently and block until the condition specified by return_when.

aws 可迭代對象必須不為空。

返回兩個(gè) Task/Future 集合: (done, pending)。

用法:

done, pending = await asyncio.wait(aws)

如指定 timeout (float 或 int 類(lèi)型) 則它將被用于控制返回之前等待的最長(cháng)秒數。

Note that this function does not raise TimeoutError. Futures or Tasks that aren't done when the timeout occurs are simply returned in the second set.

return_when 指定此函數應在何時(shí)返回。它必須為以下常數之一:

常量

描述

FIRST_COMPLETED

函數將在任意可等待對象結束或取消時(shí)返回。

FIRST_EXCEPTION

函數將在任意可等待對象因引發(fā)異常而結束時(shí)返回。當沒(méi)有引發(fā)任何異常時(shí)它就相當于 ALL_COMPLETED。

ALL_COMPLETED

函數將在所有可等待對象結束或取消時(shí)返回。

wait_for() 不同,wait() 在超時(shí)發(fā)生時(shí)不會(huì )取消可等待對象。

在 3.10 版更改: Removed the loop parameter.

在 3.11 版更改: Passing coroutine objects to wait() directly is forbidden.

asyncio.as_completed(aws, *, timeout=None)?

并發(fā)地運行 aws 可迭代對象中的 可等待對象。 返回一個(gè)協(xié)程的迭代器。 所返回的每個(gè)協(xié)程可被等待以從剩余的可等待對象的可迭代對象中獲得最早的下一個(gè)結果。

Raises TimeoutError if the timeout occurs before all Futures are done.

在 3.10 版更改: Removed the loop parameter.

示例:

for coro in as_completed(aws):
    earliest_result = await coro
    # ...

在 3.10 版更改: Removed the loop parameter.

3.10 版后已移除: 如果 aws 可迭代對象中的可等待對象不全為 Future 類(lèi)對象并且沒(méi)有正在運行的事件循環(huán)則會(huì )發(fā)出棄用警告。

在線(xiàn)程中運行?

coroutine asyncio.to_thread(func, /, *args, **kwargs)?

在不同的線(xiàn)程中異步地運行函數 func。

向此函數提供的任何 *args 和 **kwargs 會(huì )被直接傳給 func。 并且,當前 contextvars.Context 會(huì )被傳播,允許在不同的線(xiàn)程中訪(fǎng)問(wèn)來(lái)自事件循環(huán)的上下文變量。

返回一個(gè)可被等待以獲取 func 的最終結果的協(xié)程。

This coroutine function is primarily intended to be used for executing IO-bound functions/methods that would otherwise block the event loop if they were run in the main thread. For example:

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

在任何協(xié)程中直接調用 blocking_io() 將會(huì )在調用期間阻塞事件循環(huán),導致額外的 1 秒運行時(shí)間。 而通過(guò)改用 asyncio.to_thread(),我們可以在不同的線(xiàn)程中運行它從而不會(huì )阻塞事件循環(huán)。

備注

由于 GIL 的存在,asyncio.to_thread() 通常只能被用來(lái)將 IO 密集型函數變?yōu)榉亲枞摹?但是,對于會(huì )釋放 GIL 的擴展模塊或無(wú)此限制的替代性 Python 實(shí)現來(lái)說(shuō),asyncio.to_thread() 也可被用于 CPU 密集型函數。

3.9 新版功能.

跨線(xiàn)程調度?

asyncio.run_coroutine_threadsafe(coro, loop)?

向指定事件循環(huán)提交一個(gè)協(xié)程。(線(xiàn)程安全)

返回一個(gè) concurrent.futures.Future 以等待來(lái)自其他 OS 線(xiàn)程的結果。

此函數應該從另一個(gè) OS 線(xiàn)程中調用,而非事件循環(huán)運行所在線(xiàn)程。示例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果在協(xié)程內產(chǎn)生了異常,將會(huì )通知返回的 Future 對象。它也可被用來(lái)取消事件循環(huán)中的任務(wù):

try:
    result = future.result(timeout)
except TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

參見(jiàn) concurrency and multithreading 部分的文檔。

不同與其他 asyncio 函數,此函數要求顯式地傳入 loop 參數。

3.5.1 新版功能.

內省?

asyncio.current_task(loop=None)?

返回當前運行的 Task 實(shí)例,如果沒(méi)有正在運行的任務(wù)則返回 None。

如果 loopNone 則會(huì )使用 get_running_loop() 獲取當前事件循環(huán)。

3.7 新版功能.

asyncio.all_tasks(loop=None)?

返回事件循環(huán)所運行的未完成的 Task 對象的集合。

如果 loopNone,則會(huì )使用 get_running_loop() 獲取當前事件循環(huán)。

3.7 新版功能.

Task 對象?

class asyncio.Task(coro, *, loop=None, name=None)?

一個(gè)與 Future 類(lèi)似 的對象,可運行 Python 協(xié)程。非線(xiàn)程安全。

Task 對象被用來(lái)在事件循環(huán)中運行協(xié)程。如果一個(gè)協(xié)程在等待一個(gè) Future 對象,Task 對象會(huì )掛起該協(xié)程的執行并等待該 Future 對象完成。當該 Future 對象 完成,被打包的協(xié)程將恢復執行。

事件循環(huán)使用協(xié)同日程調度: 一個(gè)事件循環(huán)每次運行一個(gè) Task 對象。而一個(gè) Task 對象會(huì )等待一個(gè) Future 對象完成,該事件循環(huán)會(huì )運行其他 Task、回調或執行 IO 操作。

使用高層級的 asyncio.create_task() 函數來(lái)創(chuàng )建 Task 對象,也可用低層級的 loop.create_task()ensure_future() 函數。不建議手動(dòng)實(shí)例化 Task 對象。

要取消一個(gè)正在運行的 Task 對象可使用 cancel() 方法。調用此方法將使該 Task 對象拋出一個(gè) CancelledError 異常給打包的協(xié)程。如果取消期間一個(gè)協(xié)程正在等待一個(gè) Future 對象,該 Future 對象也將被取消。

cancelled() 可被用來(lái)檢測 Task 對象是否被取消。如果打包的協(xié)程沒(méi)有抑制 CancelledError 異常并且確實(shí)被取消,該方法將返回 True。

asyncio.TaskFuture 繼承了其除 Future.set_result()Future.set_exception() 以外的所有 API。

Task 對象支持 contextvars 模塊。當一個(gè) Task 對象被創(chuàng )建,它將復制當前上下文,然后在復制的上下文中運行其協(xié)程。

在 3.7 版更改: 加入對 contextvars 模塊的支持。

在 3.8 版更改: Added the name parameter.

3.10 版后已移除: 如果未指定 loop 并且沒(méi)有正在運行的事件循環(huán)則會(huì )發(fā)出棄用警告。

cancel(msg=None)?

請求取消 Task 對象。

這將安排在下一輪事件循環(huán)中拋出一個(gè) CancelledError 異常給被封包的協(xié)程。

協(xié)程在之后有機會(huì )進(jìn)行清理甚至使用 try ... ... except CancelledError ... finally 代碼塊抑制異常來(lái)拒絕請求。不同于 Future.cancel(),Task.cancel() 不保證 Task 會(huì )被取消,雖然抑制完全取消并不常見(jiàn),也很不鼓勵這樣做。

在 3.9 版更改: Added the msg parameter.

Deprecated since version 3.11, will be removed in version 3.14: msg parameter is ambiguous when multiple cancel() are called with different cancellation messages. The argument will be removed.

以下示例演示了協(xié)程是如何偵聽(tīng)取消請求的:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()?

如果 Task 對象 被取消 則返回 True。

當使用 cancel() 發(fā)出取消請求時(shí) Task 會(huì )被 取消,其封包的協(xié)程將傳播被拋入的 CancelledError 異常。

done()?

如果 Task 對象 已完成 則返回 True。

當 Task 所封包的協(xié)程返回一個(gè)值、引發(fā)一個(gè)異?;?Task 本身被取消時(shí),則會(huì )被認為 已完成。

result()?

返回 Task 的結果。

如果 Task 對象 已完成,其封包的協(xié)程的結果會(huì )被返回 (或者當協(xié)程引發(fā)異常時(shí),該異常會(huì )被重新引發(fā)。)

如果 Task 對象 被取消,此方法會(huì )引發(fā)一個(gè) CancelledError 異常。

如果 Task 對象的結果還不可用,此方法會(huì )引發(fā)一個(gè) InvalidStateError 異常。

exception()?

返回 Task 對象的異常。

如果所封包的協(xié)程引發(fā)了一個(gè)異常,該異常將被返回。如果所封包的協(xié)程正常返回則該方法將返回 None。

如果 Task 對象 被取消,此方法會(huì )引發(fā)一個(gè) CancelledError 異常。

如果 Task 對象尚未 完成,此方法將引發(fā)一個(gè) InvalidStateError 異常。

add_done_callback(callback, *, context=None)?

添加一個(gè)回調,將在 Task 對象 完成 時(shí)被運行。

此方法應該僅在低層級的基于回調的代碼中使用。

要了解更多細節請查看 Future.add_done_callback() 的文檔。

remove_done_callback(callback)?

從回調列表中移除 callback 。

此方法應該僅在低層級的基于回調的代碼中使用。

要了解更多細節請查看 Future.remove_done_callback() 的文檔。

get_stack(*, limit=None)?

返回此 Task 對象的??蚣芰斜?。

如果所封包的協(xié)程未完成,這將返回其掛起所在的棧。如果協(xié)程已成功完成或被取消,這將返回一個(gè)空列表。如果協(xié)程被一個(gè)異常終止,這將返回回溯框架列表。

框架總是從按從舊到新排序。

每個(gè)被掛起的協(xié)程只返回一個(gè)??蚣?。

可選的 limit 參數指定返回框架的數量上限;默認返回所有框架。返回列表的順序要看是返回一個(gè)棧還是一個(gè)回溯:棧返回最新的框架,回溯返回最舊的框架。(這與 traceback 模塊的行為保持一致。)

print_stack(*, limit=None, file=None)?

打印此 Task 對象的?;蚧厮?。

此方法產(chǎn)生的輸出類(lèi)似于 traceback 模塊通過(guò) get_stack() 所獲取的框架。

limit 參數會(huì )直接傳遞給 get_stack()。

file 參數是輸出所寫(xiě)入的 I/O 流;默認情況下輸出會(huì )寫(xiě)入 sys.stderr。

get_coro()?

返回由 Task 包裝的協(xié)程對象。

3.8 新版功能.

get_name()?

返回 Task 的名稱(chēng)。

如果沒(méi)有一個(gè) Task 名稱(chēng)被顯式地賦值,默認的 asyncio Task 實(shí)現會(huì )在實(shí)例化期間生成一個(gè)默認名稱(chēng)。

3.8 新版功能.

set_name(value)?

設置 Task 的名稱(chēng)。

value 參數可以為任意對象,它隨后會(huì )被轉換為字符串。

在默認的 Task 實(shí)現中,名稱(chēng)將在任務(wù)對象的 repr() 輸出中可見(jiàn)。

3.8 新版功能.