启动并行任务#
异步执行可以通过线程实现,使用 ThreadPoolExecutor
或 InterpreterPoolExecutor
,或者通过独立进程实现,使用 ProcessPoolExecutor
。它们都实现了相同的接口,该接口由抽象的 Executor
类定义。
concurrent.futures.Executor
抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。
concurrent.futures.Executor.submit(fn, /, *args, **kwargs)
调度可调用对象
fn
,以fn(*args, **kwargs)
方式执行并返回代表该可调用对象的执行的Future
对象。
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
733018741971166252529244672995227796776583345783942437386911678051742014907619818389889490523633966891088361470574959346120460571462260749066868322280895817921819178047106351622746089787222709001409723271535998886740900624008120633567081904814632812393533764463134148203902627783449817391855430335563732106041226371560673691183984708116601872233266074247493626304648260263767914583249791984053769482918833516091413131011123944919964273965579371981208614941585953495908535921540210708056885341387737215923345202544222865141850763901074317449693617326298168109535915635940121796253976420394712905525890085230066381155268301872764521970724361150250533240741250911370641578495445037349949847056446112243875919940177685220040160664042867777937760175315580842852772334361081793849764931709880797656009445214260838004008425745329486287217583337196739975624879209997958066918269289529173759400388462627027867023449158801888859657106016910393756105722308386102404609268566550368738720443422703712421937413064228232666730605742388335837146714118055611327514203299723649208298344877573265368313069762781039793226859829488229032564965635731481369752167217585943588854930073123415040389709513212962223638396059885947892393069120539629752913087293860710229309854604802321674065167382714715102476300146693438431833224964213369532024918771644330976993898734654636960222338307689812245659000814812860386873604770823862137671384853381092605916632447158620208279102871362522121737075798746297634021439677681885846906025762209900264981149145400622176046541895913713446104210012678561757945542108470975177238574885971783008091688989094139568186322074509855040235429441987295597788583656003164149522040284092940950997841383270838931250004957560959392148357213125798758355221152004876638131530060401294918574742158403027075892317311713232948774508045444387008377268956511566790988750066274475945057011826156359365664084922898519338475775136584478799585953802786369705215183341960068786541347670144480851661915646211142904738434781594527201521904996219737980968884208250267436396423981779399709186303960293407672554877346877193170897982220120029537710504050457313802321446995592434916524383256286111803049392908220573240027642399185919278901303127877945119916025120410258623536046989159056988455695917457277794657573586903543406892809506842341172324188614146189747989997492566982444418485858555939830994931765546646159933948646331181720254477223093614552419921625948715007606190469859917311824745902574837132432028609681828479562137848761003845549339456254612745501089667544053262976758087180329859844324991679303251520118205924889348811070368534970345655190383142368597970622914787824113699770230464815835988172741718871079971541784210688842581288010501153867757557117911157916834496975757069921359922064002354851413718493414469919988528124300509472377943419402710070938638340816072261896269139282992942967710326722487988387814927252378444344275944579731813013115789048970941051188880593003843626208600970181761638829477861608229607983725051555487136745926844381097959339538810272017727875770008857972396527027073630500507
map(fn, *iterables, timeout=None, chunksize=1)
类似于
map(fn, *iterables)
但有以下差异:iterables
是立即执行而不是延迟执行的;fn
是异步执行的并且可以并发对fn
的多个调用。 如果__next__()
被调用且从对map()
原始调用timeout
秒之后其结果还不可用则已返回的迭代器将引发TimeoutError
。timeout
可以是整数或浮点数。如果timeout
未指定或为None
,则不限制等待时间。如果
fn
调用引发了异常,那么当从迭代器获取其值时该异常将被引发。在使用
ProcessPoolExecutor
时,此方法将可迭代对象分割成若干块,然后将它们作为独立任务提交到池中。这些块的(大致)大小可以通过将chunksize
设置为正整数来指定。对于非常长的可迭代对象,与默认值1
相比,使用较大的chunksize
可以显著提高性能。对于ThreadPoolExecutor
和InterpreterPoolExecutor
,chunksize
则没有影响。
当可调用对象已关联了 Future
然后在等待另一个 Future
的结果时就会导致死锁情况。例如:
import time
import concurrent.futures
def wait_on_b():
time.sleep(5)
print(b.result()) # b 永远不会结束因为它在等待 a。
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a 永远不会结束因为它在等待 b。
return 6
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
与
import time
import concurrent.futures
def wait_on_future():
f = executor.submit(pow, 5, 2)
# 这将永远不会完成因为只有一个工作线程
# 并且它正在执行此函数。
print(f.result())
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(wait_on_future)
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistent-subdomain.python.org/']
# 检索单个网页并报告其 URL 及内容。
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# 使用 `with` 语句来确保线程得到及时清理
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 开始加载操作,并为每个未来任务标记其 URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('{%r} generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
'http://www.cnn.com/' page is 3618510 bytes
'http://www.bbc.co.uk/' page is 505598 bytes
'http://www.foxnews.com/' page is 686085 bytes
{'http://europe.wsj.com/'} generated an exception: HTTP Error 403: Forbidden
{'http://nonexistent-subdomain.python.org/'} generated an exception: <urlopen error [Errno -5] No address associated with hostname>
进程池#
import concurrent.futures
import multiprocessing as mp
import time
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
print(f"{mp.current_process()} {time.asctime()}")
return True
import asyncio
with concurrent.futures.ProcessPoolExecutor() as executor:
print(f"{mp.current_process()} {time.asctime()}")
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES, chunksize=1)):
print(f'{number:d} is prime: {prime}')
print(f"{mp.current_process()} {time.asctime()}")
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
<_MainProcess name='MainProcess' parent=None started> Mon Nov 18 08:43:37 2024
<ForkProcess name='ForkProcess-2' parent=7958 started> Mon Nov 18 08:43:38 2024
<ForkProcess name='ForkProcess-3' parent=7958 started> Mon Nov 18 08:43:38 2024
<ForkProcess name='ForkProcess-4' parent=7958 started> Mon Nov 18 08:43:38 2024
<ForkProcess name='ForkProcess-1' parent=7958 started> Mon Nov 18 08:43:38 2024
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
<ForkProcess name='ForkProcess-2' parent=7958 started> Mon Nov 18 08:43:38 2024
115797848077099 is prime: True
1099726899285419 is prime: False
<_MainProcess name='MainProcess' parent=None started> Mon Nov 18 08:43:38 2024