Python異步之在Asyncio中怎么運行阻塞任務(wù)
本文講解"Python異步之在Asyncio中如何運行阻塞任務(wù)",希望能夠解決相關(guān)問題。
正文
阻塞任務(wù)是阻止當(dāng)前線程繼續(xù)進(jìn)行的任務(wù)。
如果在 asyncio 程序中執(zhí)行阻塞任務(wù),它會停止整個事件循環(huán),從而阻止任何其他協(xié)程繼續(xù)進(jìn)行。
我們可以通過 asyncio.to_thread() 和 loop.run_in_executor() 函數(shù)在 asyncio 程序中異步運行阻塞調(diào)用。
1. 阻塞任務(wù)
asyncio的重點是異步編程和非阻塞IO。然而,我們經(jīng)常需要在 asyncio 應(yīng)用程序中執(zhí)行阻塞函數(shù)調(diào)用。
這可能有很多原因,例如:
- 執(zhí)行 CPU 密集型任務(wù),例如計算某事。
- 執(zhí)行阻塞 IO 綁定任務(wù),如從文件讀取或?qū)懭搿?p>
- 調(diào)用不支持 asyncio 的第三方庫。
直接在 asyncio 程序中進(jìn)行阻塞調(diào)用將導(dǎo)致事件循環(huán)在執(zhí)行阻塞調(diào)用時停止。它不允許其他協(xié)程在后臺運行。
我們?nèi)绾卧?asyncio 程序中異步執(zhí)行阻塞調(diào)用?
2. 如何運行阻塞任務(wù)
asyncio 模塊提供了兩種在 asyncio 程序中執(zhí)行阻塞調(diào)用的方法。
第一種是使用 asyncio.to_thread() 函數(shù)。這是在高級 API 中,供應(yīng)用程序開發(fā)人員使用。
asyncio.to_thread() 函數(shù)采用要執(zhí)行的函數(shù)名和任何參數(shù)。
該函數(shù)在單獨的線程中執(zhí)行。它返回一個可以作為獨立任務(wù)等待或安排的協(xié)程。
... # execute a function in a separate thread await asyncio.to_thread(task)
在返回的協(xié)程有機(jī)會在事件循環(huán)中運行之前,任務(wù)不會開始執(zhí)行。asyncio.to_thread() 函數(shù)在后臺創(chuàng)建一個 ThreadPoolExecutor 來執(zhí)行阻塞調(diào)用。因此,asyncio.to_thread() 函數(shù)僅適用于 IO 綁定任務(wù)。
另一種方法是使用 loop.run_in_executor() 函數(shù)。
這是在低級異步 API 中,首先需要訪問事件循環(huán),例如通過 asyncio.get_running_loop() 函數(shù)。
loop.run_in_executor() 函數(shù)接受一個執(zhí)行器和一個要執(zhí)行的函數(shù)。
如果沒有為執(zhí)行器提供,則使用默認(rèn)執(zhí)行器,即 ThreadPoolExecutor。
loop.run_in_executor() 函數(shù)返回一個可等待對象,如果需要可以等待它。任務(wù)將立即開始執(zhí)行,因此返回的可等待對象不需要等待或安排阻塞調(diào)用開始執(zhí)行。
... # get the event loop loop = asyncio.get_running_loop() # execute a function in a separate thread await loop.run_in_executor(None, task)
或者,可以創(chuàng)建一個執(zhí)行器并將其傳遞給 loop.run_in_executor() 函數(shù),該函數(shù)將在執(zhí)行器中執(zhí)行異步調(diào)用。
在這種情況下,調(diào)用者必須管理執(zhí)行器,一旦調(diào)用者完成它就將其關(guān)閉。
... # create a process pool with ProcessPoolExecutor as exe: # get the event loop loop = asyncio.get_running_loop() # execute a function in a separate thread await loop.run_in_executor(exe, task) # process pool is shutdown automatically...
這兩種方法允許阻塞調(diào)用作為異步任務(wù)在 asyncio 程序中執(zhí)行。
現(xiàn)在我們知道如何在 asyncio 程序中執(zhí)行阻塞調(diào)用,讓我們看一些有效的例子。
3. 實例
我們可以探索如何使用 asyncio.to_thread() 在 asyncio 程序中執(zhí)行阻塞 IO 綁定調(diào)用。
在這個例子中,我們將定義一個函數(shù)來阻塞調(diào)用者幾秒鐘。然后,我們將使用 asyncio.to_thread() 函數(shù)在 asyncio 的線程池中異步執(zhí)行此函數(shù)。
這將使呼叫者騰出時間繼續(xù)其他活動。
# SuperFastPython.com # example of running a blocking io-bound task in asyncio import asyncio import time # a blocking io-bound task def blocking_task(): # report a message print('Task starting') # block for a while time.sleep(2) # report a message print('Task done') # main coroutine async def main(): # report a message print('Main running the blocking task') # create a coroutine for the blocking task coro = asyncio.to_thread(blocking_task) # schedule the task task = asyncio.create_task(coro) # report a message print('Main doing other things') # allow the scheduled task to start await asyncio.sleep(0) # await the task await task # run the asyncio program asyncio.run(main())
運行示例首先創(chuàng)建 main() 協(xié)程并將其作為 asyncio 程序的入口點運行。main() 協(xié)程運行并報告一條消息。然后它發(fā)出對線程池的阻塞函數(shù)調(diào)用的調(diào)用。然后將協(xié)程包裝在任務(wù)中并獨立執(zhí)行。
main() 協(xié)程可以自由地繼續(xù)其他活動。在這種情況下,它會休眠片刻以允許計劃任務(wù)開始執(zhí)行。這使得目標(biāo)函數(shù)可以在后臺下發(fā)給 ThreadPoolExecutor 并開始運行。
然后 main() 協(xié)程掛起并等待任務(wù)完成。阻塞函數(shù)報告一條消息,休眠 2 秒,然后報告最后一條消息。
這突出了我們?nèi)绾卧谝粋€單獨的線程中與 asyncio 程序異步執(zhí)行阻塞 IO 綁定任務(wù)。
Main running the blocking task Main doing other things Task starting Task done
關(guān)于 "Python異步之在Asyncio中如何運行阻塞任務(wù)" 就介紹到此。希望多多支持碩編程。
- Python的gtts庫將文字轉(zhuǎn)為音頻應(yīng)該如何操作
- Python異步之在Asyncio中怎么運行阻塞任務(wù)
- Python異步之迭代器怎么使用
- Python錯誤JSONDecodeError:?Expecting?value:?line?1?column?1怎么解決
- Python異步之上下文管理器怎么使用
- Python異步之生成器怎么使用
- Python迭代器如何創(chuàng)建使用
- python操作Excel神器openpyxl如何使用
- Python 開發(fā)環(huán)境
- Python DNS查找
- Python HTTP響應(yīng)
- Python HTTP標(biāo)頭
- Python 請求狀態(tài)代碼
- Python HTTP服務(wù)器
- Python 上傳數(shù)據(jù)
- Python 并發(fā)簡介
- Python 系統(tǒng)和內(nèi)存架構(gòu)
- Python 線程
- Python 線程并發(fā)
- Python 多處理器