启动并行任务#
异步执行可以通过线程实现,使用 ThreadPoolExecutor
或 InterpreterPoolExecutor
,或者通过独立进程实现,使用 ProcessPoolExecutor
。它们都实现了相同的接口,该接口由抽象的 Executor
类定义。
concurrent.futures.Executor
抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。
concurrent.futures.Executor.submit(fn, /, *args, **kwargs)
调度可调用对象
fn
,以fn(*args, **kwargs)
方式执行并返回代表该可调用对象的执行的Future
对象。
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())

map(fn, *iterables, timeout=None, chunksize=1)
类似于
map(fn, *iterables)
但有以下差异:iterables
是立即执行而不是延迟执行的;fn
是异步执行的并且可以并发对fn
的多个调用。 如果__next__()
被调用且从对map()
原始调用timeout
秒之后其结果还不可用则已返回的迭代器将引发TimeoutError
。timeout
可以是整数或浮点数。如果timeout
未指定或为None
,则不限制等待时间。如果
fn
调用引发了异常,那么当从迭代器获取其值时该异常将被引发。在使用
ProcessPoolExecutor
时,此方法将可迭代对象分割成若干块,然后将它们作为独立任务提交到池中。这些块的(大致)大小可以通过将chunksize
设置为正整数来指定。对于非常长的可迭代对象,与默认值1
相比,使用较大的chunksize
可以显著提高性能。对于ThreadPoolExecutor
和InterpreterPoolExecutor
,chunksize
则没有影响。
当可调用对象已关联了 Future
然后在等待另一个 Future
的结果时就会导致死锁情况。例如:
import time
import concurrent.futures
def wait_on_b():
time.sleep(5)
print(b.result()) # b 永远不会结束因为它在等待 a。
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a 永远不会结束因为它在等待 b。
return 6
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
与
import time
import concurrent.futures
def wait_on_future():
f = executor.submit(pow, 5, 2)
# 这将永远不会完成因为只有一个工作线程
# 并且它正在执行此函数。
print(f.result())
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(wait_on_future)
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistent-subdomain.python.org/']
# 检索单个网页并报告其 URL 及内容。
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# 使用 `with` 语句来确保线程得到及时清理
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 开始加载操作,并为每个未来任务标记其 URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('{%r} generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
{'http://nonexistent-subdomain.python.org/'} generated an exception: <urlopen error [Errno -2] Name or service not known>
{'http://www.foxnews.com/'} generated an exception: HTTP Error 403: Forbidden 1001
{'http://europe.wsj.com/'} generated an exception: <urlopen error [Errno 101] Network is unreachable>
{'http://www.bbc.co.uk/'} generated an exception: <urlopen error [Errno 101] Network is unreachable>
'http://www.cnn.com/' page is 3469638 bytes
进程池#
import concurrent.futures
import multiprocessing as mp
import time
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
print(f"{mp.current_process()} {time.asctime()}")
return True
import asyncio
with concurrent.futures.ProcessPoolExecutor() as executor:
print(f"{mp.current_process()} {time.asctime()}")
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES, chunksize=1)):
print(f'{number:d} is prime: {prime}')
print(f"{mp.current_process()} {time.asctime()}")
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
<_MainProcess name='MainProcess' parent=None started> Sat Nov 9 15:26:43 2024
<ForkProcess name='ForkProcess-4' parent=1848926 started> Sat Nov 9 15:26:43 2024
<ForkProcess name='ForkProcess-5' parent=1848926 started> Sat Nov 9 15:26:43 2024
<ForkProcess name='ForkProcess-2' parent=1848926 started> Sat Nov 9 15:26:43 2024
<ForkProcess name='ForkProcess-1' parent=1848926 started> Sat Nov 9 15:26:43 2024
<ForkProcess name='ForkProcess-3' parent=1848926 started> Sat Nov 9 15:26:43 2024
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
<_MainProcess name='MainProcess' parent=None started> Sat Nov 9 15:26:43 2024