黄色电影一区二区,韩国少妇自慰A片免费看,精品人妻少妇一级毛片免费蜜桃AV按摩师 ,超碰 香蕉

Python混合如何使用同步和異步函數(shù)

python混合如何使用同步和異步函數(shù)

本文講解"python混合怎么使用同步和異步函數(shù)",希望能夠解決相關(guān)問題。

在協(xié)程函數(shù)中調(diào)用同步函數(shù)

在協(xié)程函數(shù)中直接調(diào)用同步函數(shù)會(huì)阻塞事件循環(huán),從而影響整個(gè)程序的性能。我們先來看一個(gè)例子:

以下是使用異步 web 框架 fastapi 寫的一個(gè)例子,fastapi 是比較快,但不正確的操作將會(huì)變得很慢。

import?time

from?fastapi?import?fastapi

app?=?fastapi()


@app.get("/")
async?def?root():
????time.sleep(10)
????return?{"message":?"hello?world"}


@app.get("/health")
async?def?health():
????return?{"status":?"ok"}

上面我們寫了兩個(gè)接口,假設(shè) root 接口函數(shù)耗時(shí) 10 秒,在這 10 秒內(nèi)訪問 health 接口,想一想會(huì)發(fā)生什么?

python混合如何使用同步和異步函數(shù)

訪問 root 接口(左),立即訪問 health 接口(右),health 接口被阻塞,直至 root 接口返回后,health 接口才成功響應(yīng)。

time.sleep 就是一個(gè)「同步」函數(shù),它會(huì)阻塞整個(gè)事件循環(huán)。

如何解決呢?想一想以前的處理方法,如果一個(gè)函數(shù)會(huì)阻塞主線程,那么就再開一個(gè)線程讓這個(gè)阻塞函數(shù)單獨(dú)運(yùn)行。所以,這里也是同理,開一個(gè)線程單獨(dú)去運(yùn)行那些阻塞式操作,比如讀取文件等。

loop.run_in_executor 方法將同步函數(shù)轉(zhuǎn)換為異步非阻塞方式進(jìn)行處理。具體來說,loop.run_in_executor() 可以將同步函數(shù)創(chuàng)建為一個(gè)線程或進(jìn)程,并在其中執(zhí)行該函數(shù),從而避免阻塞事件循環(huán)。

官方例子:在線程或者進(jìn)程池中執(zhí)行代碼。

那么,我們使用 loop.run_in_executor 改寫上面例子,如下:

import?asyncio
import?time

from?fastapi?import?fastapi

app?=?fastapi()


@app.get("/")
async?def?root():
????loop?=?asyncio.get_event_loop()

????def?do_blocking_work():
????????time.sleep(10)
????????print("done?blocking?work!!")

????await?loop.run_in_executor(none,?do_blocking_work)
????return?{"message":?"hello?world"}


@app.get("/health")
async?def?health():
????return?{"status":?"ok"}

效果如下:

python混合如何使用同步和異步函數(shù)

root 接口被阻塞期間,health 依然正常訪問互不影響。

注意: 這里都是為了演示,實(shí)際在使用 fastapi 開發(fā)時(shí),你可以直接將 async def root 更換成 def root ,也就是將其換成同步接口函數(shù),fastapi 內(nèi)部會(huì)自動(dòng)創(chuàng)建線程處理這個(gè)同步接口函數(shù)??偟膩碚f,fastapi 內(nèi)部也是依靠線程去處理同步函數(shù)從而避免阻塞主線程(或主線程中的事件循環(huán))。

在同步函數(shù)中調(diào)用異步函數(shù)

協(xié)程只能在「事件循環(huán)」內(nèi)被執(zhí)行,且同一時(shí)刻只能有一個(gè)協(xié)程被執(zhí)行。

所以,在同步函數(shù)中調(diào)用異步函數(shù),其本質(zhì)就是將協(xié)程「扔進(jìn)」事件循環(huán)中,等待該協(xié)程執(zhí)行完獲取結(jié)果即可。

以下這些函數(shù),都可以實(shí)現(xiàn)這個(gè)效果:

  • asyncio.run

  • asyncio.run_coroutine_threadsafe

  • loop.run_until_complete

  • create_task

接下來,我們將一一講解這些方法并舉例說明。

asyncio.run

這個(gè)方法使用起來最簡單,先看下如何使用,然后緊跟著講一下哪些場景不能直接使用 asyncio.run

import?asyncio

async?def?do_work():
????return?1

def?main():
????result?=?asyncio.run(do_work())
????print(result)??#?1

if?__name__?==?"__main__":
????main()

直接 run 就完事了,然后接受返回值即可。

但是需要,注意的是 asyncio.run 每次調(diào)用都會(huì)新開一個(gè)事件循環(huán),當(dāng)結(jié)束時(shí)自動(dòng)關(guān)閉該事件循環(huán)。

一個(gè)線程內(nèi)只存在一個(gè)事件循環(huán),所以如果當(dāng)前線程已經(jīng)有存在的事件循環(huán)了,就不應(yīng)該使用 asyncio.run 了,否則就會(huì)拋出如下異常:

runtimeerror: asyncio.run() cannot be called from a running event loop

因此,asyncio.run 用作新開一個(gè)事件循環(huán)時(shí)使用。

asyncio.run_coroutine_threadsafe

向指定事件循環(huán)提交一個(gè)協(xié)程。(線程安全)
返回一個(gè) concurrent.futures.future 以等待來自其他 os 線程的結(jié)果。

換句話說,就是將協(xié)程丟給其他線程中的事件循環(huán)去運(yùn)行。

值得注意的是這里的「事件循環(huán)」應(yīng)該是其他線程中的事件循環(huán),非當(dāng)前線程的事件循環(huán)。

其返回的結(jié)果是一個(gè) future 對象,如果你需要獲取協(xié)程的執(zhí)行結(jié)果可以使用 future.result() 獲取

下方給了一個(gè)例子,一共有兩個(gè)線程:thread_with_loop 和 another_thread,分別用于啟動(dòng)事件循環(huán)和調(diào)用 run_coroutine_threadsafe

import?asyncio
import?threading
import?time

loop?=?none


def?get_loop():
????global?loop
????if?loop?is?none:
????????loop?=?asyncio.new_event_loop()
????return?loop


def?another_thread():
????async?def?coro_func():
????????return?1

????loop?=?get_loop()
????#?將協(xié)程提交到另一個(gè)線程的事件循環(huán)中執(zhí)行
????future?=?asyncio.run_coroutine_threadsafe(coro_func(),?loop)
????#?等待協(xié)程執(zhí)行結(jié)果
????print(future.result())
????#?停止事件循環(huán)
????loop.call_soon_threadsafe(loop.stop)


def?thread_with_loop():
????loop?=?get_loop()
????#?啟動(dòng)事件循環(huán),確保事件循環(huán)不會(huì)退出,直到?loop.stop()?被調(diào)用
????loop.run_forever()
????loop.close()


#?啟動(dòng)一個(gè)線程,線程內(nèi)部啟動(dòng)了一個(gè)事件循環(huán)
threading.thread(target=thread_with_loop).start()
time.sleep(1)
#?在主線程中啟動(dòng)一個(gè)協(xié)程,?并將協(xié)程提交到另一個(gè)線程的事件循環(huán)中執(zhí)行
t?=?threading.thread(target=another_thread)
t.start()
t.join()
loop.run_until_complete

運(yùn)行直到 future ( future 的實(shí)例 ) 被完成。

這個(gè)方法和 asyncio.run 類似。

具體就是傳入一個(gè)協(xié)程對象或者任務(wù),然后可以直接拿到協(xié)程的返回值。

run_until_complete 屬于 loop 對象的方法,所以這個(gè)方法的使用前提是有一個(gè)事件循環(huán),注意這個(gè)事件循環(huán)必須是非運(yùn)行狀態(tài),如果是運(yùn)行中就會(huì)拋出如下異常:

runtimeerror: this event loop is already running

例子:

loop?=?asyncio.new_event_loop()
loop.run_until_complete(do_async_work())
create_task

再次準(zhǔn)確一點(diǎn):要運(yùn)行一個(gè)協(xié)程函數(shù)的本質(zhì)是將攜帶協(xié)程函數(shù)的任務(wù)提交至事件循環(huán)中,由事件循環(huán)發(fā)現(xiàn)、調(diào)度并執(zhí)行。

其實(shí)一共就是滿足兩個(gè)條件:

  • 任務(wù);

  • 事件循環(huán)。

我們使用 async def func 定義的函數(shù)叫做協(xié)程函數(shù),func() 這樣調(diào)用之后返回的結(jié)果是協(xié)程對象,到這一步協(xié)程函數(shù)內(nèi)的代碼都沒有被執(zhí)行,直到協(xié)程對象被包裝成了任務(wù),事件循環(huán)才會(huì)“正眼看它們”。

所以事件循環(huán)調(diào)度運(yùn)行的基本單元就是任務(wù),那為什么我們在使用 async/await 這些語句時(shí)沒有涉及到任務(wù)這個(gè)概念呢?

這是因?yàn)?await 語法糖在內(nèi)部將協(xié)程對象封裝成了任務(wù),再次強(qiáng)調(diào)事件循環(huán)只認(rèn)識任務(wù)。

所以,想要運(yùn)行一個(gè)協(xié)程對象,其實(shí)就是將協(xié)程對象封裝成一個(gè)任務(wù),至于事件循環(huán)是如何發(fā)現(xiàn)、調(diào)度和執(zhí)行的,這個(gè)我們不用關(guān)心。

那將協(xié)程封裝成的任務(wù)的方法有哪些呢?

  • asyncio.create_task

  • asyncio.ensure_future

  • loop.create_task

看著有好幾個(gè)的,沒關(guān)系,我們只關(guān)心 loop.create_task,因?yàn)槠渌椒ㄗ罱K都是調(diào)用 loop.create_task。

使用起來也是很簡單的,將協(xié)程對象傳入,返回值是一個(gè)任務(wù)對象。

async?def?do_work():
????return?222

task?=?loop.create_task(do_work())

do_work 會(huì)被異步執(zhí)行,那么 do_work 的結(jié)果怎么獲取呢,task.result() 可以嗎?

分情況:

  • 如果是在一個(gè)協(xié)程函數(shù)內(nèi)使用 await task.result(),這是可以的;

  • 如果是在普通函數(shù)內(nèi)則不行。你不可能立即獲得協(xié)程函數(shù)的返回值,因?yàn)閰f(xié)程函數(shù)還沒有被執(zhí)行呢。

asyncio.task 運(yùn)行使用 add_done_callback 添加完成時(shí)的回調(diào)函數(shù),所以我們可以「曲線救國」,使用回調(diào)函數(shù)將結(jié)果添加到隊(duì)列、future 等等。

我這里給個(gè)基于 concurrent.futures.future 獲取結(jié)果的例子,如下:

import?asyncio
from?asyncio?import?task
from?concurrent.futures?import?future

from?fastapi?import?fastapi

app?=?fastapi()
loop?=?asyncio.get_event_loop()


async?def?do_work1():
????return?222


@app.get("/")
def?root():
????#?新建一個(gè)?future?對象,用于接受結(jié)果值
????future?=?future()

????#?提交任務(wù)至事件循環(huán)
????task?=?loop.create_task(do_work1())

????#?回調(diào)函數(shù)
????def?done_callback(task:?task):
????????#?設(shè)置結(jié)果
????????future.set_result(task.result())

????#?為這個(gè)任務(wù)添加回調(diào)函數(shù)
????task.add_done_callback(done_callback)

????#?future.result?會(huì)被阻塞,直到有結(jié)果返回為止
????return?future.result()??#?222

關(guān)于 "python混合怎么使用同步和異步函數(shù)" 就介紹到此。希望多多支持碩編程。

下一節(jié):python迭代器怎么創(chuàng)建使用

python編程技術(shù)

相關(guān)文章