用 asyncio 開(kāi)發(fā)?
異步編程與傳統的“順序”編程不同。
本頁(yè)列出常見(jiàn)的錯誤和陷阱,并解釋如何避免它們。
Debug 模式?
默認情況下,asyncio以生產(chǎn)模式運行。為了簡(jiǎn)化開(kāi)發(fā),asyncio還有一種*debug 模式* 。
有幾種方法可以啟用異步調試模式:
將
PYTHONASYNCIODEBUG
環(huán)境變量設置為1
。將
debug=True
傳遞給asyncio.run()
。調用
loop.set_debug()
。
除了啟用調試模式外,還要考慮:
將 asyncio logger 的日志級別設置為
logging.DEBUG
,例如,下面的代碼片段可以在應用程序啟動(dòng)時(shí)運行:logging.basicConfig(level=logging.DEBUG)
配置
warnings
模塊以顯示ResourceWarning
警告。一種方法是使用-W
default
命令行選項。
啟用調試模式時(shí):
asyncio 檢查 未被等待的協(xié)程 并記錄他們;這將消除“被遺忘的等待”問(wèn)題。
許多非線(xiàn)程安全的異步 APIs (例如
loop.call_soon()
和loop.call_at()
方法),如果從錯誤的線(xiàn)程調用,則會(huì )引發(fā)異常。如果執行I/O操作花費的時(shí)間太長(cháng),則記錄I/O選擇器的執行時(shí)間。
Callbacks taking longer than 100 milliseconds are logged. The
loop.slow_callback_duration
attribute can be used to set the minimum execution duration in seconds that is considered "slow".
并發(fā)性和多線(xiàn)程?
事件循環(huán)在線(xiàn)程中運行(通常是主線(xiàn)程),并在其線(xiàn)程中執行所有回調和任務(wù)。當一個(gè)任務(wù)在事件循環(huán)中運行時(shí),沒(méi)有其他任務(wù)可以在同一個(gè)線(xiàn)程中運行。當一個(gè)任務(wù)執行一個(gè) await
表達式時(shí),正在運行的任務(wù)被掛起,事件循環(huán)執行下一個(gè)任務(wù)。
要調度來(lái)自另一 OS 線(xiàn)程的 callback,應該使用 loop.call_soon_threadsafe()
方法。 例如:
loop.call_soon_threadsafe(callback, *args)
幾乎所有異步對象都不是線(xiàn)程安全的,這通常不是問(wèn)題,除非在任務(wù)或回調函數之外有代碼可以使用它們。如果需要這樣的代碼來(lái)調用低級異步API,應該使用 loop.call_soon_threadsafe()
方法,例如:
loop.call_soon_threadsafe(fut.cancel)
要從不同的OS線(xiàn)程調度一個(gè)協(xié)程對象,應該使用 run_coroutine_threadsafe()
函數。它返回一個(gè) concurrent.futures.Future
。查詢(xún)結果:
async def coro_func():
return await asyncio.sleep(1, 42)
# Later in another OS thread:
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()
為了能夠處理信號和執行子進(jìn)程,事件循環(huán)必須運行于主線(xiàn)程中。
方法 loop.run_in_executor()
可以和 concurrent.futures.ThreadPoolExecutor
一起使用,用于在一個(gè)不同的操作系統線(xiàn)程中執行阻塞代碼,并避免阻塞運行事件循環(huán)的那個(gè)操作系統線(xiàn)程。
目前沒(méi)有什么辦法能直接從另一個(gè)進(jìn)程 (例如通過(guò) multiprocessing
啟動(dòng)的進(jìn)程) 安排協(xié)程或回調。 事件循環(huán)方法 小節列出了可以從管道讀取并監視文件描述符而不會(huì )阻塞事件循環(huán)的 API。 此外,asyncio 的 子進(jìn)程 API 提供了一種啟動(dòng)進(jìn)程并從事件循環(huán)與其通信的辦法。 最后,之前提到的 loop.run_in_executor()
方法也可配合 concurrent.futures.ProcessPoolExecutor
使用以在另一個(gè)進(jìn)程中執行代碼。
運行阻塞的代碼?
不應該直接調用阻塞( CPU 綁定)代碼。例如,如果一個(gè)函數執行1秒的 CPU 密集型計算,那么所有并發(fā)異步任務(wù)和 IO 操作都將延遲1秒。
可以用執行器在不同的線(xiàn)程甚至不同的進(jìn)程中運行任務(wù),以避免使用事件循環(huán)阻塞 OS 線(xiàn)程。 請參閱 loop.run_in_executor()
方法了解詳情。
日志記錄?
asyncio使用 logging
模塊,所有日志記錄都是通過(guò) "asyncio"
logger執行的。
默認日志級別是 logging.INFO
??梢院苋菀椎卣{整:
logging.getLogger("asyncio").setLevel(logging.WARNING)
檢測 never-awaited 協(xié)同程序?
當協(xié)程函數被調用而不是被等待時(shí) (即執行 coro()
而不是 await coro()
) 或者協(xié)程沒(méi)有通過(guò) asyncio.create_task()
被排入計劃日程,asyncio 將會(huì )發(fā)出一條 RuntimeWarning
:
import asyncio
async def test():
print("never scheduled")
async def main():
test()
asyncio.run(main())
輸出:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
test()
調試模式的輸出:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
File "../t.py", line 7, in main
test()
test()
通常的修復方法是等待協(xié)程或者調用 asyncio.create_task()
函數:
async def main():
await test()
檢測就再也沒(méi)異常?
如果調用 Future.set_exception()
,但不等待 Future 對象,將異常傳播到用戶(hù)代碼。在這種情況下,當 Future 對象被垃圾收集時(shí),asyncio將發(fā)出一條日志消息。
未處理異常的例子:
import asyncio
async def bug():
raise Exception("not consumed")
async def main():
asyncio.create_task(bug())
asyncio.run(main())
輸出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed')>
Traceback (most recent call last):
File "test.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed
激活調試模式 以獲取任務(wù)創(chuàng )建處的跟蹤信息:
asyncio.run(main(), debug=True)
調試模式的輸出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed') created at asyncio/tasks.py:321>
source_traceback: Object created at (most recent call last):
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
Traceback (most recent call last):
File "../t.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed