启动并行任务

目录

启动并行任务#

异步执行可以通过线程实现,使用 ThreadPoolExecutorInterpreterPoolExecutor,或者通过独立进程实现,使用 ProcessPoolExecutor。它们都实现了相同的接口,该接口由抽象的 Executor 类定义。

concurrent.futures.Executor

抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。

concurrent.futures.Executor.submit(fn, /, *args, **kwargs)

调度可调用对象 fn,以 fn(*args, **kwargs) 方式执行并返回代表该可调用对象的执行的 Future 对象。

import concurrent.futures

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
733018741971166252529244672995227796776583345783942437386911678051742014907619818389889490523633966891088361470574959346120460571462260749066868322280895817921819178047106351622746089787222709001409723271535998886740900624008120633567081904814632812393533764463134148203902627783449817391855430335563732106041226371560673691183984708116601872233266074247493626304648260263767914583249791984053769482918833516091413131011123944919964273965579371981208614941585953495908535921540210708056885341387737215923345202544222865141850763901074317449693617326298168109535915635940121796253976420394712905525890085230066381155268301872764521970724361150250533240741250911370641578495445037349949847056446112243875919940177685220040160664042867777937760175315580842852772334361081793849764931709880797656009445214260838004008425745329486287217583337196739975624879209997958066918269289529173759400388462627027867023449158801888859657106016910393756105722308386102404609268566550368738720443422703712421937413064228232666730605742388335837146714118055611327514203299723649208298344877573265368313069762781039793226859829488229032564965635731481369752167217585943588854930073123415040389709513212962223638396059885947892393069120539629752913087293860710229309854604802321674065167382714715102476300146693438431833224964213369532024918771644330976993898734654636960222338307689812245659000814812860386873604770823862137671384853381092605916632447158620208279102871362522121737075798746297634021439677681885846906025762209900264981149145400622176046541895913713446104210012678561757945542108470975177238574885971783008091688989094139568186322074509855040235429441987295597788583656003164149522040284092940950997841383270838931250004957560959392148357213125798758355221152004876638131530060401294918574742158403027075892317311713232948774508045444387008377268956511566790988750066274475945057011826156359365664084922898519338475775136584478799585953802786369705215183341960068786541347670144480851661915646211142904738434781594527201521904996219737980968884208250267436396423981779399709186303960293407672554877346877193170897982220120029537710504050457313802321446995592434916524383256286111803049392908220573240027642399185919278901303127877945119916025120410258623536046989159056988455695917457277794657573586903543406892809506842341172324188614146189747989997492566982444418485858555939830994931765546646159933948646331181720254477223093614552419921625948715007606190469859917311824745902574837132432028609681828479562137848761003845549339456254612745501089667544053262976758087180329859844324991679303251520118205924889348811070368534970345655190383142368597970622914787824113699770230464815835988172741718871079971541784210688842581288010501153867757557117911157916834496975757069921359922064002354851413718493414469919988528124300509472377943419402710070938638340816072261896269139282992942967710326722487988387814927252378444344275944579731813013115789048970941051188880593003843626208600970181761638829477861608229607983725051555487136745926844381097959339538810272017727875770008857972396527027073630500507
map(fn, *iterables, timeout=None, chunksize=1)

类似于 map(fn, *iterables) 但有以下差异:

iterables 是立即执行而不是延迟执行的;

fn 是异步执行的并且可以并发对 fn 的多个调用。 如果 __next__() 被调用且从对 map() 原始调用 timeout 秒之后其结果还不可用则已返回的迭代器将引发 TimeoutErrortimeout 可以是整数或浮点数。如果 timeout 未指定或为 None,则不限制等待时间。

如果 fn 调用引发了异常,那么当从迭代器获取其值时该异常将被引发。

在使用 ProcessPoolExecutor 时,此方法将可迭代对象分割成若干块,然后将它们作为独立任务提交到池中。这些块的(大致)大小可以通过将 chunksize 设置为正整数来指定。对于非常长的可迭代对象,与默认值 1 相比,使用较大的 chunksize 可以显著提高性能。对于 ThreadPoolExecutorInterpreterPoolExecutorchunksize 则没有影响。

当可调用对象已关联了 Future 然后在等待另一个 Future 的结果时就会导致死锁情况。例如:

import time
import concurrent.futures

def wait_on_b():
    time.sleep(5)
    print(b.result())  # b 永远不会结束因为它在等待 a。
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a 永远不会结束因为它在等待 b。
    return 6


with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    a = executor.submit(wait_on_b)
    b = executor.submit(wait_on_a)

import time
import concurrent.futures


def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # 这将永远不会完成因为只有一个工作线程
    # 并且它正在执行此函数。
    print(f.result())

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    executor.submit(wait_on_future)
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# 检索单个网页并报告其 URL 及内容。
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# 使用 `with` 语句来确保线程得到及时清理
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # 开始加载操作,并为每个未来任务标记其 URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('{%r} generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
'http://www.cnn.com/' page is 3618510 bytes
'http://www.bbc.co.uk/' page is 505598 bytes
'http://www.foxnews.com/' page is 686085 bytes
{'http://europe.wsj.com/'} generated an exception: HTTP Error 403: Forbidden
{'http://nonexistent-subdomain.python.org/'} generated an exception: <urlopen error [Errno -5] No address associated with hostname>

进程池#

import concurrent.futures
import multiprocessing as mp
import time
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    print(f"{mp.current_process()} {time.asctime()}")
    return True
import asyncio
with concurrent.futures.ProcessPoolExecutor() as executor:
    print(f"{mp.current_process()} {time.asctime()}")
    for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES, chunksize=1)):
        print(f'{number:d} is prime: {prime}')
    print(f"{mp.current_process()} {time.asctime()}")
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
<_MainProcess name='MainProcess' parent=None started> Mon Nov 18 08:43:37 2024
<ForkProcess name='ForkProcess-2' parent=7958 started> Mon Nov 18 08:43:38 2024
<ForkProcess name='ForkProcess-3' parent=7958 started> Mon Nov 18 08:43:38 2024
<ForkProcess name='ForkProcess-4' parent=7958 started> Mon Nov 18 08:43:38 2024
<ForkProcess name='ForkProcess-1' parent=7958 started> Mon Nov 18 08:43:38 2024
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
<ForkProcess name='ForkProcess-2' parent=7958 started> Mon Nov 18 08:43:38 2024
115797848077099 is prime: True
1099726899285419 is prime: False
<_MainProcess name='MainProcess' parent=None started> Mon Nov 18 08:43:38 2024