python 進程池
可以使用與創(chuàng)建和使用線程池相同的方式創(chuàng)建和使用進程池。進程池可以定義為預先實例化和空閑進程的組,它們隨時可以進行工作。當我們需要執(zhí)行大量任務時,創(chuàng)建進程池優(yōu)先于為每個任務實例化新進程。
python模塊 - concurrent.futures
python標準庫有一個名為 concurrent.futures 的模塊。該模塊是在python 3.2中添加的,用于為開發(fā)人員提供啟動異步任務的高級接口。它是python的線程和多處理模塊之上的抽象層,用于提供使用線程池或進程池運行任務的接口。
在接下來的部分中,我們將查看concurrent.futures模塊的不同子類。
執(zhí)行者類
executor 是 concurrent.futures python模塊 的抽象類。它不能直接使用,我們需要使用以下具體子類之一
- threadpoolexecutor
- processpoolexecutor
processpoolexecutor - 一個具體的子類
它是executor類的具體子類之一。它使用多處理,我們獲得了一組用于提交任務的流程。此池將任務分配給可用進程并安排它們運行。
如何創(chuàng)建processpoolexecutor?
在 concurrent.futures 模塊及其具體子類 executor 的幫助下,我們可以輕松創(chuàng)建一個進程池。為此,我們需要構建一個 processpoolexecutor ,其中包含我們在池中所需的進程數(shù)。默認情況下,該數(shù)字為5.然后,將任務提交到流程池。
例
我們現(xiàn)在將考慮在創(chuàng)建線程池時使用的相同示例,唯一的區(qū)別是現(xiàn)在我們將使用 processpoolexecutor 而不是 threadpoolexecutor 。
from concurrent.futures import processpoolexecutor from time import sleep def task(message): sleep(2) return message def main(): executor = processpoolexecutor(5) future = executor.submit(task, ("completed")) print(future.done()) sleep(2) print(future.done()) print(future.result()) if __name__ == '__main__': main()
輸出
false false completed
在上面的示例中,process poolexecutor 已構造為5個線程。然后,在給出消息之前等待2秒的任務被提交給進程池執(zhí)行器。從輸出中可以看出,任務直到2秒才完成,因此第一次調(diào)用 done() 將返回false。2秒后,任務完成,我們通過調(diào)用 result() 方法得到未來的 結(jié)果 。
實例化processpoolexecutor - 上下文管理器
實例化processpoolexecutor的另一種方法是在上下文管理器的幫助下。它的工作方式與上例中使用的方法類似。使用上下文管理器的主要優(yōu)點是它在語法上看起來很好。實例化可以在以下代碼的幫助下完成
with processpoolexecutor(max_workers = 5) as executor
例
為了更好地理解,我們采用與創(chuàng)建線程池時使用的相同的示例。在此示例中,我們需要先導入 concurrent.futures 模塊。然后創(chuàng)建一個名為 load_url() 的函數(shù),它將加載請求的url。該 processpoolexecutor 然后用5號在池中的線程創(chuàng)建的。process poolexecutor 已被用作上下文管理器。我們可以通過調(diào)用 result() 方法獲得未來的 結(jié)果 。
import concurrent.futures from concurrent.futures import processpoolexecutor import urllib.request urls = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout = timeout) as conn: return conn.read() def main(): with concurrent.futures.processpoolexecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) if __name__ == '__main__': main()
輸出
上面的python腳本將生成以下輸出
'http://some-made-up-domain.com/' generated an exception: 'http://www.foxnews.com/' page is 229476 bytes 'http://www.cnn.com/' page is 165323 bytes 'http://www.bbc.co.uk/' page is 284981 bytes 'http://europe.wsj.com/' page is 967575 bytes
使用executor.map()函數(shù)
python map() 函數(shù)廣泛用于執(zhí)行許多任務。一個這樣的任務是將特定函數(shù)應用于迭代中的每個元素。類似地,我們可以將迭代器的所有元素映射到函數(shù),并將它們作為獨立的作業(yè)提交給 processpoolexecutor 。請考慮以下python腳本示例來理解這一點。
例
我們將考慮使用 executor.map() 函數(shù)創(chuàng)建線程池時使用的相同示例。在下面給出的示例中,map函數(shù)用于將 square() 函數(shù)應用于values數(shù)組中的每個值。
from concurrent.futures import processpoolexecutor from concurrent.futures import as_completed values = [2,3,4,5] def square(n): return n * n def main(): with processpoolexecutor(max_workers = 3) as executor: results = executor.map(square, values) for result in results: print(result) if __name__ == '__main__': main()
輸出
上面的python腳本將生成以下輸出
4 9 16 25
何時使用processpoolexecutor和threadpoolexecutor?
既然我們已經(jīng)研究了executor類 - threadpoolexecutor和processpoolexecutor,我們需要知道何時使用哪個執(zhí)行器。我們需要在遇到cpu限制的工作負載時選擇processpoolexecutor,在i / o綁定工作負載的情況下選擇threadpoolexecutor。
如果我們使用 processpoolexecutor ,那么我們不需要擔心gil,因為它使用多處理。而且,與 threadpoolexecution 相比,執(zhí)行時間會更短。請考慮以下python腳本示例來理解這一點。
例
import time import concurrent.futures value = [8000000, 7000000] def counting(n): start = time.time() while n > 0: n -= 1 return time.time() - start def main(): start = time.time() with concurrent.futures.processpoolexecutor() as executor: for number, time_taken in zip(value, executor.map(counting, value)): print('start: {} time taken: {}'.format(number, time_taken)) print('total time taken: {}'.format(time.time() - start)) if __name__ == '__main__': main()
輸出
start: 8000000 time taken: 1.5509998798370361 start: 7000000 time taken: 1.3259999752044678 total time taken: 2.0840001106262207 example- python script with threadpoolexecutor: import time import concurrent.futures value = [8000000, 7000000] def counting(n): start = time.time() while n > 0: n -= 1 return time.time() - start def main(): start = time.time() with concurrent.futures.threadpoolexecutor() as executor: for number, time_taken in zip(value, executor.map(counting, value)): print('start: {} time taken: {}'.format(number, time_taken)) print('total time taken: {}'.format(time.time() - start)) if __name__ == '__main__': main()
輸出
start: 8000000 time taken: 3.8420000076293945 start: 7000000 time taken: 3.6010000705718994 total time taken: 3.8480000495910645
從上述兩個程序的輸出中,我們可以看到使用 processpoolexecutor 和 threadpoolexecutor時 執(zhí)行時間的差異。