假設(shè)我們必須為多線程任務(wù)創(chuàng)建大量線程。由于線程太多,因此可能存在許多性能問(wèn)題,這在計(jì)算上是最昂貴的。一個(gè)主要問(wèn)題可能是吞吐量受限。我們可以通過(guò)創(chuàng)建一個(gè)線程池來(lái)解決這個(gè)問(wèn)題。線程池可以被定義為預(yù)先實(shí)例化和空閑線程的組,其準(zhǔn)備好被給予工作。當(dāng)我們需要執(zhí)行大量任務(wù)時(shí),創(chuàng)建線程池優(yōu)先于為每個(gè)任務(wù)實(shí)例化新線程。線程池可以管理大量線程的并發(fā)執(zhí)行,如下所示
- 如果線程池中的線程完成其執(zhí)行,則可以重用該線程。
- 如果線程終止,則將創(chuàng)建另一個(gè)線程來(lái)替換該線程。
python模塊 - concurrent.futures
python標(biāo)準(zhǔn)庫(kù)包括 concurrent.futures 模塊。該模塊是在python 3.2中添加的,用于為開發(fā)人員提供啟動(dòng)異步任務(wù)的高級(jí)接口。它是python的線程和多處理模塊之上的抽象層,用于提供使用線程池或進(jìn)程池運(yùn)行任務(wù)的接口。
在接下來(lái)的部分中,我們將了解concurrent.futures模塊的不同類。
執(zhí)行者類
executor 是 concurrent.futures python模塊 的抽象類。它不能直接使用,我們需要使用以下具體子類之一
- 的threadpoolexecutor
- processpoolexecutor
threadpoolexecutor - 一個(gè)具體的子類
它是executor類的具體子類之一。子類使用多線程,我們獲得了一個(gè)用于提交任務(wù)的線程池。此池將任務(wù)分配給可用線程并安排它們運(yùn)行。
如何創(chuàng)建threadpoolexecutor?
在 concurrent.futures 模塊及其具體的子類 executor 的幫助下,我們可以輕松地創(chuàng)建一個(gè)線程池。為此,我們需要構(gòu)造一個(gè) threadpoolexecutor ,其中包含我們?cè)诔刂邢胍木€程數(shù)。默認(rèn)情況下,該數(shù)字為5.然后我們可以向線程池提交任務(wù)。當(dāng)我們 提交() 任務(wù)時(shí),我們會(huì)回到 未來(lái)。future對(duì)象有一個(gè)名為 done() 的方法,它告訴未來(lái)是否已經(jīng)解決。有了這個(gè),就為該特定的未來(lái)對(duì)象設(shè)置了一個(gè)值。任務(wù)完成后,線程池執(zhí)行程序?qū)⒅翟O(shè)置為future對(duì)象。
例
from concurrent.futures import threadpoolexecutor from time import sleep def task(message): sleep(2) return message def main(): executor = threadpoolexecutor(5) future = executor.submit(task, ("completed")) print(future.done()) sleep(2) print(future.done()) print(future.result()) if __name__ == '__main__': main()
輸出
false true completed
在上面的例子中, threadpoolexecutor 已經(jīng)構(gòu)造了5個(gè)線程。然后,在給出消息之前等待2秒的任務(wù)被提交給線程池執(zhí)行器。從輸出中可以看出,任務(wù)直到2秒才完成,因此第一次調(diào)用 done() 將返回false。2秒后,任務(wù)完成,我們通過(guò)調(diào)用 result() 方法得到未來(lái)的 結(jié)果 。
實(shí)例化threadpoolexecutor - 上下文管理器
實(shí)例化 threadpoolexecutor的 另一種方法是在上下文管理器的幫助下。它的工作方式與上例中使用的方法類似。使用上下文管理器的主要優(yōu)點(diǎn)是它在語(yǔ)法上看起來(lái)很好。實(shí)例化可以在以下代碼的幫助下完成
with threadpoolexecutor(max_workers = 5) as executor
例
以下示例是從python文檔中借用的。在此示例中,首先必須導(dǎo)入 concurrent.futures 模塊。然后創(chuàng)建一個(gè)名為 load_url() 的函數(shù),它將加載請(qǐng)求的url。然后,該函數(shù)使用 池中 的5個(gè)線程 創(chuàng)建 threadpoolexecutor。該 threadpoolexecutor的 已被用作上下文管理器。我們可以通過(guò)調(diào)用 result() 方法獲得未來(lái)的 結(jié)果 。
import concurrent.futures import urllib.request urls = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout = timeout) as conn: return conn.read() with concurrent.futures.threadpoolexecutor(max_workers = 5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
輸出
以下是上述python腳本的輸出
'http://some-made-up-domain.com/' generated an exception: 'http://www.foxnews.com/' page is 229313 bytes 'http://www.cnn.com/' page is 168933 bytes 'http://www.bbc.co.uk/' page is 283893 bytes 'http://europe.wsj.com/' page is 938109 bytes
使用executor.map()函數(shù)
python map() 函數(shù)廣泛用于許多任務(wù)中。一個(gè)這樣的任務(wù)是將特定函數(shù)應(yīng)用于迭代中的每個(gè)元素。類似地,我們可以將迭代器的所有元素映射到一個(gè)函數(shù),并將它們作為獨(dú)立的作業(yè)提交給threadpoolexecutor 。請(qǐng)考慮以下python腳本示例,以了解該函數(shù)的工作原理。
例
在下面的示例中,map函數(shù)用于將 square() 函數(shù)應(yīng)用于values數(shù)組中的每個(gè)值。
from concurrent.futures import threadpoolexecutor from concurrent.futures import as_completed values = [2,3,4,5] def square(n): return n * n def main(): with threadpoolexecutor(max_workers = 3) as executor: results = executor.map(square, values) for result in results: print(result) if __name__ == '__main__': main()
輸出
上面的python腳本生成以下輸出 -
4 9 16 25