用 asyncio 開(kāi)發(fā)?

異步編程與傳統的“順序”編程不同。

本頁(yè)列出常見(jiàn)的錯誤和陷阱,并解釋如何避免它們。

Debug 模式?

默認情況下,asyncio以生產(chǎn)模式運行。為了簡(jiǎn)化開(kāi)發(fā),asyncio還有一種*debug 模式* 。

有幾種方法可以啟用異步調試模式:

除了啟用調試模式外,還要考慮:

  • asyncio logger 的日志級別設置為 logging.DEBUG,例如,下面的代碼片段可以在應用程序啟動(dòng)時(shí)運行:

    logging.basicConfig(level=logging.DEBUG)
    
  • 配置 warnings 模塊以顯示 ResourceWarning 警告。一種方法是使用 -W default 命令行選項。

啟用調試模式時(shí):

  • asyncio 檢查 未被等待的協(xié)程 并記錄他們;這將消除“被遺忘的等待”問(wèn)題。

  • 許多非線(xiàn)程安全的異步 APIs (例如 loop.call_soon()loop.call_at() 方法),如果從錯誤的線(xiàn)程調用,則會(huì )引發(fā)異常。

  • 如果執行I/O操作花費的時(shí)間太長(cháng),則記錄I/O選擇器的執行時(shí)間。

  • Callbacks taking longer than 100 milliseconds are logged. The loop.slow_callback_duration attribute can be used to set the minimum execution duration in seconds that is considered "slow".

并發(fā)性和多線(xiàn)程?

事件循環(huán)在線(xiàn)程中運行(通常是主線(xiàn)程),并在其線(xiàn)程中執行所有回調和任務(wù)。當一個(gè)任務(wù)在事件循環(huán)中運行時(shí),沒(méi)有其他任務(wù)可以在同一個(gè)線(xiàn)程中運行。當一個(gè)任務(wù)執行一個(gè) await 表達式時(shí),正在運行的任務(wù)被掛起,事件循環(huán)執行下一個(gè)任務(wù)。

要調度來(lái)自另一 OS 線(xiàn)程的 callback,應該使用 loop.call_soon_threadsafe() 方法。 例如:

loop.call_soon_threadsafe(callback, *args)

幾乎所有異步對象都不是線(xiàn)程安全的,這通常不是問(wèn)題,除非在任務(wù)或回調函數之外有代碼可以使用它們。如果需要這樣的代碼來(lái)調用低級異步API,應該使用 loop.call_soon_threadsafe() 方法,例如:

loop.call_soon_threadsafe(fut.cancel)

要從不同的OS線(xiàn)程調度一個(gè)協(xié)程對象,應該使用 run_coroutine_threadsafe() 函數。它返回一個(gè) concurrent.futures.Future 。查詢(xún)結果:

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

為了能夠處理信號和執行子進(jìn)程,事件循環(huán)必須運行于主線(xiàn)程中。

方法 loop.run_in_executor() 可以和 concurrent.futures.ThreadPoolExecutor 一起使用,用于在一個(gè)不同的操作系統線(xiàn)程中執行阻塞代碼,并避免阻塞運行事件循環(huán)的那個(gè)操作系統線(xiàn)程。

目前沒(méi)有什么辦法能直接從另一個(gè)進(jìn)程 (例如通過(guò) multiprocessing 啟動(dòng)的進(jìn)程) 安排協(xié)程或回調。 事件循環(huán)方法 小節列出了可以從管道讀取并監視文件描述符而不會(huì )阻塞事件循環(huán)的 API。 此外,asyncio 的 子進(jìn)程 API 提供了一種啟動(dòng)進(jìn)程并從事件循環(huán)與其通信的辦法。 最后,之前提到的 loop.run_in_executor() 方法也可配合 concurrent.futures.ProcessPoolExecutor 使用以在另一個(gè)進(jìn)程中執行代碼。

運行阻塞的代碼?

不應該直接調用阻塞( CPU 綁定)代碼。例如,如果一個(gè)函數執行1秒的 CPU 密集型計算,那么所有并發(fā)異步任務(wù)和 IO 操作都將延遲1秒。

可以用執行器在不同的線(xiàn)程甚至不同的進(jìn)程中運行任務(wù),以避免使用事件循環(huán)阻塞 OS 線(xiàn)程。 請參閱 loop.run_in_executor() 方法了解詳情。

日志記錄?

asyncio使用 logging 模塊,所有日志記錄都是通過(guò) "asyncio" logger執行的。

默認日志級別是 logging.INFO ??梢院苋菀椎卣{整:

logging.getLogger("asyncio").setLevel(logging.WARNING)

檢測 never-awaited 協(xié)同程序?

當協(xié)程函數被調用而不是被等待時(shí) (即執行 coro() 而不是 await coro()) 或者協(xié)程沒(méi)有通過(guò) asyncio.create_task() 被排入計劃日程,asyncio 將會(huì )發(fā)出一條 RuntimeWarning:

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

輸出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

調試模式的輸出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的修復方法是等待協(xié)程或者調用 asyncio.create_task() 函數:

async def main():
    await test()

檢測就再也沒(méi)異常?

如果調用 Future.set_exception() ,但不等待 Future 對象,將異常傳播到用戶(hù)代碼。在這種情況下,當 Future 對象被垃圾收集時(shí),asyncio將發(fā)出一條日志消息。

未處理異常的例子:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

輸出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

激活調試模式 以獲取任務(wù)創(chuàng )建處的跟蹤信息:

asyncio.run(main(), debug=True)

調試模式的輸出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed