启动并行任务

目录

启动并行任务#

异步执行可以通过线程实现,使用 ThreadPoolExecutorInterpreterPoolExecutor,或者通过独立进程实现,使用 ProcessPoolExecutor。它们都实现了相同的接口,该接口由抽象的 Executor 类定义。

concurrent.futures.Executor

抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。

concurrent.futures.Executor.submit(fn, /, *args, **kwargs)

调度可调用对象 fn,以 fn(*args, **kwargs) 方式执行并返回代表该可调用对象的执行的 Future 对象。

import concurrent.futures

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

map(fn, *iterables, timeout=None, chunksize=1)

类似于 map(fn, *iterables) 但有以下差异:

iterables 是立即执行而不是延迟执行的;

fn 是异步执行的并且可以并发对 fn 的多个调用。 如果 __next__() 被调用且从对 map() 原始调用 timeout 秒之后其结果还不可用则已返回的迭代器将引发 TimeoutErrortimeout 可以是整数或浮点数。如果 timeout 未指定或为 None,则不限制等待时间。

如果 fn 调用引发了异常,那么当从迭代器获取其值时该异常将被引发。

在使用 ProcessPoolExecutor 时,此方法将可迭代对象分割成若干块,然后将它们作为独立任务提交到池中。这些块的(大致)大小可以通过将 chunksize 设置为正整数来指定。对于非常长的可迭代对象,与默认值 1 相比,使用较大的 chunksize 可以显著提高性能。对于 ThreadPoolExecutorInterpreterPoolExecutorchunksize 则没有影响。

当可调用对象已关联了 Future 然后在等待另一个 Future 的结果时就会导致死锁情况。例如:

import time
import concurrent.futures

def wait_on_b():
    time.sleep(5)
    print(b.result())  # b 永远不会结束因为它在等待 a。
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a 永远不会结束因为它在等待 b。
    return 6


with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    a = executor.submit(wait_on_b)
    b = executor.submit(wait_on_a)

import time
import concurrent.futures


def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # 这将永远不会完成因为只有一个工作线程
    # 并且它正在执行此函数。
    print(f.result())

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    executor.submit(wait_on_future)
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# 检索单个网页并报告其 URL 及内容。
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# 使用 `with` 语句来确保线程得到及时清理
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # 开始加载操作,并为每个未来任务标记其 URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('{%r} generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
'http://www.cnn.com/' page is 3618510 bytes
'http://www.bbc.co.uk/' page is 505598 bytes
'http://www.foxnews.com/' page is 686085 bytes
{'http://europe.wsj.com/'} generated an exception: HTTP Error 403: Forbidden
{'http://nonexistent-subdomain.python.org/'} generated an exception: <urlopen error [Errno -5] No address associated with hostname>

进程池#

import concurrent.futures
import multiprocessing as mp
import time
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    print(f"{mp.current_process()} {time.asctime()}")
    return True
import asyncio
with concurrent.futures.ProcessPoolExecutor() as executor:
    print(f"{mp.current_process()} {time.asctime()}")
    for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES, chunksize=1)):
        print(f'{number:d} is prime: {prime}')
    print(f"{mp.current_process()} {time.asctime()}")
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
<_MainProcess name='MainProcess' parent=None started> Mon Nov 18 08:43:37 2024
<ForkProcess name='ForkProcess-2' parent=7958 started> Mon Nov 18 08:43:38 2024
<ForkProcess name='ForkProcess-3' parent=7958 started> Mon Nov 18 08:43:38 2024
<ForkProcess name='ForkProcess-4' parent=7958 started> Mon Nov 18 08:43:38 2024
<ForkProcess name='ForkProcess-1' parent=7958 started> Mon Nov 18 08:43:38 2024
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
<ForkProcess name='ForkProcess-2' parent=7958 started> Mon Nov 18 08:43:38 2024
115797848077099 is prime: True
1099726899285419 is prime: False
<_MainProcess name='MainProcess' parent=None started> Mon Nov 18 08:43:38 2024