异步任务#

参考:《asyncio 系列》1. 什么是 asyncio?如何基于单线程实现并发?事件循环又是怎么工作的?

先了解两个概念:

备注

IO密集型是指那些主要受限于输入/输出操作的任务

  1. 基本概述

    • 定义特点:IO密集型程序的特点是需要频繁地与外部设备(如磁盘、网络、键盘、鼠标等)进行数据交换,而相对较少的计算操作。

    • 性能瓶颈:IO密集型任务的性能瓶颈通常在于I/O设备的带宽和响应时间。

  2. 典型应用

    • 数据库管理系统:例如SQL查询、数据插入、数据更新等。

    • Web服务器:例如处理HTTP请求、响应客户端、传输数据等。

    • 文件系统:例如文件读写、文件同步、文件备份等。

    • 实时通信:例如聊天应用、在线游戏、远程桌面等。

  3. 优化方法

    • 使用高性能I/O设备:例如更快的磁盘、更大的内存、更高带宽的网络等。

    • 采用I/O多路复用技术:通过使用select、poll、epoll等技术,同时处理多个I/O操作,提高I/O设备的利用率。

    • 采用异步I/O技术:避免阻塞式I/O操作,提高程序的响应性能。

    • 利用缓存和预取技术:减少不必要的I/O操作,提高数据访问速度。

  4. 线程选择

    • 多线程策略:对于IO密集型任务,由于CPU在等待I/O操作时处于空闲状态,因此可以通过增加线程数来提高CPU的利用率。

    • 线程数计算公式:一般遵循I/O密集型核心线程数 = CPU核数 / (1-阻塞系数),其中阻塞系数在0到1范围内,一般为0.8~0.9之间。

  5. 注意事项

    • 线程数限制:虽然增加线程数可以提高CPU的利用率,但并不是线程数越多越好。过多的线程会导致线程切换和资源竞争的开销增加,反而会降低系统的性能。

    • 具体业务调整:在实际项目中,需要根据具体业务情况和硬件环境来选择合适的线程数。

总的来说,IO密集型任务是计算机系统中一类重要的任务类型,其特点是频繁的I/O操作和相对较低的CPU使用率。通过合理的优化方法和线程选择策略,可以显著提高IO密集型任务的性能和响应能力。

备注

CPU密集型指的是系统的硬盘、内存性能相对CPU要好很多,此时系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,因此CPU负载长期过高。以下是关于CPU密集型的详细介绍:

  1. 基本概述

    • 定义特点:CPU密集型任务是指主要消耗CPU资源的任务,通常需要大量的计算和逻辑处理。

    • 性能瓶颈:CPU密集型任务的性能瓶颈在于CPU的处理能力,而不是I/O设备的带宽和响应时间。

  2. 典型应用

    • 数值计算:例如科学计算、数据分析等。

    • 图形图像处理:例如视频解码、图像处理等。

    • 机器学习:例如模型训练、推理等。

    • 游戏开发:例如游戏物理模拟、AI计算等。

  3. 优化方法

    • 使用多线程或多进程:通过并行化计算,提高CPU的利用率。

    • 优化算法和数据结构:减少不必要的计算和内存访问,提高程序的运行效率。

    • 利用硬件加速:例如使用GPU进行并行计算。

  4. 线程选择

    • 多线程策略:对于CPU密集型任务,由于CPU在等待I/O操作时处于空闲状态,因此可以通过增加线程数来提高CPU的利用率。

    • 线程数计算公式:一般遵循CPU密集型核心线程数 = CPU核数 + 1。

总的来说,CPU密集型任务是计算机系统中一类重要的任务类型,其特点是需要大量的CPU资源进行计算和逻辑处理。通过合理的优化方法和线程选择策略,可以显著提高CPU密集型任务的性能和响应能力。

from d2py.utils.file import mkdir
temp_dir = ".temp" # 缓存目录
mkdir(temp_dir, parents=True, exist_ok=True)

比如任务:

import httpx

res = httpx.get("http://www.bing.com")  # IO 密集型(web 请求)
items = res.headers.items()
headers = [f"{key}: {val}" for key, val in items]  # CPU 密集型(响应处理)
formatted_headers = "\n".join(headers)  # CPU 密集型(字符串连接)
with open(f"{temp_dir}/headers.txt", "w", encoding="utf-8") as f:
    f.write(formatted_headers)  # IO 密集型(磁盘写入)

异步任务允许在执行任务时暂停特定程序的执行,可在后台等待初始任务完成时运行其他代码。这允许同时执行许多任务,从而潜在地加快应用程序的运行速度。

备注

asyncio 使用协同多任务来实现并发性,当应用程序达到可以等待一段时间以返回结果的时间点时,在代码中显式地标记它,并让其它代码执行。一旦标记的任务完成,应用程序就"醒来"并继续执行该任务。这是一种并发形式,因为可同时启动多个任务,但重要的是,这不是并行模式,因为它们不会同时执行代码。

进程

操作系统分配资源的最小单元。

线程

操作系统用来调度 CPU 的最小单元。

进程好比一个房子,而线程是房子里面干活的人,所以一个进程里面可以有多个线程,线程共享进程里面的资源。因此真正用来工作的是线程,进程只负责提供相应的内存空间和资源。

进程

进程是具有其它应用程序无法访问的内存空间的应用程序运行状态,创建 Python 进程的例子:运行简单的 “hello world” 应用程序,或在命令行输入 Python 来启动 REPL(交互式环境)。

多个进程可以在一台机器上运行,如果有一台拥有多核 CPU 的机器,就可以同时执行多个进程。在只有一个核的 CPU 上,仍可通过时间片,同时运行多个应用程序。当操作系统使用时间片时,它会在一段时间后自动切换下一个进程并运行它。确定何时发生此切换的算法因操作系统而异。

线程

线程可以被认为是轻量级进程,此外线程是操作系统可以管理的最小结构,它们不像进程那样有自己的内存空间,相反,它们共享进程的内存。线程与创建它们的进程关联,一个进程总是至少有一个与之关联的线程,通常称为主线程。一个进程还可以创建其他线程,通常称为工作线程或后台线程,这些线程可与主线程同时执行其他工作。线程很像进程,可以在多核 CPU 上并行运行,操作系统也可通过时间片在它们之间切换。当运行一个普通的 Python 应用程序时,会创建一个进程以及一个负责执行具体代码的主线程。

import os
import threading

print(f"进程启动,pid 为 {os.getpid()}")
print(f"该进程内部运行 {threading.active_count()} 个线程")
print(f"当前正在运行的线程是 {threading.current_thread().name}")
进程启动,pid 为 7831
该进程内部运行 7 个线程
当前正在运行的线程是 MainThread

进程还可创建新的线程,这些线程可通过所谓的多线程技术同时完成其他工作,并共享进程的内存。

import threading

def a_task():
    # print('任务名称:', __name__)
    print('父进程 id:', os.getppid())
    print('当前进程 id:', os.getpid())
    print(f"当前线程:{threading.current_thread().name}")

thread = threading.Thread(target=a_task, name="task a")
thread.start()

print(f"该进程内部运行 {threading.active_count()} 个线程")
print(f"当前正在运行的线程是 {threading.current_thread().name}")
thread.join()
父进程 id: 2541
当前进程 id: 7831
当前线程:task a
该进程内部运行 7 个线程
当前正在运行的线程是 MainThread

也可以使用多进程的方式执行此任务:

import multiprocessing as mp

# 在 Windows 上必须加上 if __name__ == '__main__'
# 否则多进程乎启动失败
if __name__ == '__main__':
    process = mp.Process(target=a_task)
    process.start()
    process.join()
父进程 id:
7831
当前进程 id:
7849
当前线程:MainThread

下面介绍使用协程的方式实现并发任务。

async def a_task():
    print('父进程 id:', os.getppid())
    print('当前进程 id:', os.getpid())
    print(f"当前线程:{threading.current_thread().name}")

async 把函数 main() 变成 Coroutine 类型,即协程对象。

a_task()
<coroutine object a_task at 0x7f17e8f19080>
import asyncio
f = a_task()
print(f"{f} 为协程 {asyncio.iscoroutine(f)}")
f.close() # 防止资源警告
<coroutine object a_task at 0x7f17e88a31d0> 为协程 True

执行协程,需要在 事件循环 中显式执行它。

可以使用便捷函数 asyncio.run() 运行事件循环并执行协程任务:

import asyncio

asyncio.run(a_task())

在 Jupyter Notebook 环境中,可以直接在单元格使用如下方式执行(因为它本身已经启动了事件循环):

await a_task()
父进程 id: 2541
当前进程 id: 7831
当前线程:MainThread

await 表达式,挂起了协程对象的执行以等待 awaitable 对象。

asyncio 的真正优势是能暂停执行,让事件循环在长时间运行期间,运行其他任务。要暂停执行,使用 await 关键字,await 关键字之后通常会调用协程(更具体地说是被称为 awaitable 的对象,可以是协程或者有 __await__() 方法的对象)。

使用 await 关键字将导致它后面的协程运行,这与直接调用协程不同,因为直接调用只会产生一个协程对象。await 表达式也会暂停它所在的协程,直到等待的协程完成并返回结果。等待的协程完成时,将访问它返回的结果,并唤醒 await 所在的协程。

import asyncio
import time
import threading

async def main(name):
    time.time()
    print(f"欢迎 {name} 进入 {threading.current_thread()}||{time.asctime()}")
    await asyncio.sleep(2) # 阻塞 2 秒,用于模拟长时任务
    print(f"再次进入 {threading.current_thread()}||{time.asctime()}")
    return name
await main("小明")
欢迎 小明 进入 <_MainThread(MainThread, started 139741054942080)>||Mon Nov 18 08:41:38 2024
再次进入 <_MainThread(MainThread, started 139741054942080)>||Mon Nov 18 08:41:40 2024
'小明'

可以看到,前后两次打印间隔 2s。

可以使用 asyncio.create_task() 函数用来并发运行作为 asyncio asyncio.Task 的多个协程。

import time
import asyncio

async def delay(seconds):
    print(f"开始休眠 {seconds} 秒||{time.ctime()}")
    await asyncio.sleep(seconds)
    print(f"休眠完成||{time.ctime()}")
    return seconds

直接执行 await delay(3),那么在处理其他事情前需要至少等待 3 秒:

res = await delay(3)
print(f"其他事情||{time.ctime()}")
开始休眠 3 秒||Mon Nov 18 08:41:40 2024
休眠完成||Mon Nov 18 08:41:43 2024
其他事情||Mon Nov 18 08:41:43 2024
async def main():
    print(f"开始任务: {time.ctime()}")
    # 将 delay(3) 包装成任务,注:包装完之后直接就丢到事件循环里面运行了
    # 因此这里会立即返回,而返回值是 asyncio.Task 对象
    sleep_for_three = asyncio.create_task(delay(3))
    print(f"其他事情||{time.ctime()}")
    print(f"sleep_for_three: {sleep_for_three.__class__}||{time.ctime()}")
    # 至于协程究竟有没有运行完毕,我们可以通过 Task 对象来查看
    # 当协程运行完毕或者报错,都看做是运行完毕了,那么调用 Task 对象的 done 方法会返回 True
    # 否则返回 False,由于代码是立即执行,还没有到 3 秒钟,因此打印结果为 False
    print(f"协程(任务)是否执行完毕: {sleep_for_three.done()}||{time.ctime()}")
    # 这里则保证必须等到 Task 对象里面的协程运行完毕后,才能往下执行
    result = await sleep_for_three
    print(f"协程(任务)是否执行完毕: {sleep_for_three.done()}||{time.ctime()}")
    print(f"返回值: {result}")

await main()
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
开始任务: Mon Nov 18 08:41:43 2024
其他事情||Mon Nov 18 08:41:43 2024
sleep_for_three: <class '_asyncio.Task'>||Mon Nov 18 08:41:43 2024
协程(任务)是否执行完毕: False||Mon Nov 18 08:41:43 2024
开始休眠 3 秒||Mon Nov 18 08:41:43 2024
休眠完成||Mon Nov 18 08:41:46 2024
协程(任务)是否执行完毕: True||Mon Nov 18 08:41:46 2024
返回值: 3

可以执行多任务调度:

async def main():
    print(f"开始任务: {time.ctime()}")
    sleep_for_three = asyncio.create_task(delay(3))
    sleep_again = asyncio.create_task(delay(3))
    sleep_once_more = asyncio.create_task(delay(3))
    await sleep_for_three
    print(f"结束 sleep_for_three 任务: {time.ctime()}")
    await sleep_again
    print(f"结束 sleep_again 任务: {time.ctime()}")
    await sleep_once_more
    print(f"结束全部任务: {time.ctime()}")

await main()
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
开始任务: Mon Nov 18 08:41:49 2024
开始休眠 3 秒||Mon Nov 18 08:41:49 2024
开始休眠 3 秒||Mon Nov 18 08:41:49 2024
开始休眠 3 秒||Mon Nov 18 08:41:49 2024
休眠完成||Mon Nov 18 08:41:52 2024
休眠完成||Mon Nov 18 08:41:52 2024
休眠完成||Mon Nov 18 08:41:52 2024
结束 sleep_for_three 任务: Mon Nov 18 08:41:52 2024
结束 sleep_again 任务: Mon Nov 18 08:41:52 2024
结束全部任务: Mon Nov 18 08:41:52 2024

在上面的代码中启动了三个任务,每个任务需要 3 秒才能完成。但由于对 create_task 的每次调用都会立即返回,因此会立即到达 await sleep_for_three 语句,并且三个任务都丢到了事件循环,开启执行。由于 asyncio.sleep() 属于 IO,因此会进行切换,所以三个任务是并发执行的,这也意味着整个程序会在 3 秒钟左右完成,而不是 9 秒钟。

取消任务#

取消任务很简单,每个任务对象都有名为 asyncio.Task.cancel() 的方法,可以在想要停止任务时调用它。取消任务将导致该任务在执行 await 时引发 CancelledError,然后再根据需要处理它

import asyncio

async def delay(seconds):
    print(f"开始休眠 {seconds} 秒")
    await asyncio.sleep(seconds)
    print(f"休眠完成")
    return seconds

async def main():
    long_task = asyncio.create_task(delay(10))
    seconds_elapsed = 0

    while not long_task.done():
        print(f"检测到任务尚未完成,一秒钟之后继续检测 || {time.ctime()}")
        await asyncio.sleep(1)
        seconds_elapsed += 1
        # 时间超过 5 秒,取消任务
        if seconds_elapsed == 5:
            long_task.cancel()

    try:
        # 等待 long_task 完成,显然执行到这里的时候,任务已经被取消
        # 不管是 await 一个已经取消的任务,还是 await 的时候任务被取消
        # 都会引发 asyncio.CancelledError
        await long_task
    except asyncio.CancelledError:
        print("任务被取消")

await main()
await asyncio.sleep(9) # 保证 ipykernel 单元格线程耗尽
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:41:55 2024
开始休眠 10 秒
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:41:56 2024
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:41:57 2024
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:41:58 2024
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:41:59 2024
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:42:00 2024
任务被取消

小技巧

关于取消任务需要注意的是,CancelledError 只能从 await 语句抛出。这意味着如果在任务在执行普通 Python 代码时被取消,那么该代码将一直运行,直到触发下一个 await 语句(如果存在),才能引发 CancelledError

async def delay(seconds):
    print(f"开始休眠 {seconds} 秒")
    await asyncio.sleep(seconds)
    print(f"休眠完成")
    return seconds

async def main():
    long_task = asyncio.create_task(delay(3))
    # 立刻取消
    long_task.cancel()
    # 但 CancelledError 只有在 await 取消的协程时才会触发
    # 所以下面的语句会正常执行
    print(f"我会正常执行|| {time.ctime()}")
    print("Hello World")
    print(list(range(10)))
    await asyncio.sleep(5)
    try:
        # 引发 CancelledError
        await long_task
    except asyncio.CancelledError:
        print(f"任务被取消|| {time.ctime()}")

await main()
await asyncio.sleep(14) # 保证 ipykernel 单元格线程耗尽
我会正常执行|| Mon Nov 18 08:42:10 2024
Hello World
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
任务被取消|| Mon Nov 18 08:42:15 2024

设置超时并使用 asyncio.wait_for() 执行任务取消#

asyncio.wait_for() 函数接收协程或任务对象,以及以秒为单位的超时时间。如果任务完成所需的时间超过了设定的超时时间,则会引发 TimeoutException,任务将自动取消。

import asyncio

async def delay(seconds):
    print(f"开始休眠 {seconds} 秒|| {time.ctime()}")
    await asyncio.sleep(seconds)
    print(f"休眠完成|| {time.ctime()}")
    return seconds

async def main():
    delay_task = asyncio.create_task(delay(2))
    try:
        result = await asyncio.wait_for(delay_task, 1)
        print("返回值:", result)
    except asyncio.TimeoutError:
        print(f"超时啦|| {time.ctime()}")
        # delay_task.cancelled() 用于判断任务是否被取消
        # 任务被取消:返回 True,没有被取消:返回 False
        print(f"{time.ctime()}||任务是否被取消:", delay_task.cancelled())

await main()
await asyncio.sleep(1) # 保证 ipykernel 单元格线程耗尽
开始休眠 2 秒|| Mon Nov 18 08:42:29 2024
超时啦|| Mon Nov 18 08:42:30 2024
Mon Nov 18 08:42:30 2024||任务是否被取消: True

某任务花费的时间比预期的要长,但即便超过了规定的超时时间,也不取消该任务。为此,可使用 asyncio.shield() 函数包装任务,这个函数将防止传入的协程被取消,会给予它屏蔽,将取消请求将忽略掉。

import asyncio
import time

async def delay(seconds):
    print(f"开始休眠 {seconds} 秒")
    await asyncio.sleep(seconds)
    print(f"休眠完成")
    return seconds

async def main():
    delay_task = asyncio.create_task(delay(2))
    try:
        # 通过 asyncio.shield 将 delay_task 保护起来
        result = await asyncio.wait_for(asyncio.shield(delay_task), 1)
        print(f"{time.ctime()}: 返回值:", result)
    except asyncio.TimeoutError:
        print(f"{time.ctime()}: 超时啦")
        # 如果超时依旧会引发 TimeoutError,但和之前不同的是
        # 此时任务不会被取消了,因为 asyncio.shield 会将取消请求忽略掉
        print(f"{time.ctime()}: 任务是否被取消:", delay_task.cancelled())
        # 从出现超时的地方,继续执行,并等待它完成
        result = await delay_task
        print(f"{time.ctime()}: 返回值:", result)

await main()
await asyncio.sleep(2) # 保证 ipykernel 单元格线程耗尽
开始休眠 2 秒
Mon Nov 18 08:42:32 2024: 超时啦
Mon Nov 18 08:42:32 2024: 任务是否被取消: False
休眠完成
Mon Nov 18 08:42:33 2024: 返回值: 2

任务、协程、future 和 awaitable#

任务、协程、Future 和 awaitable 是异步编程中的基本概念,它们在定义、状态以及执行方式等方面存在区别。以下是具体分析:

  1. 协程

    • 定义:协程(Coroutine)是一种比线程更轻量级的并发执行单元,通过 async def 关键字定义。

    • 状态:协程对象本身没有状态,但当它被封装进 asyncio.Task 后,会继承 asyncio.Task 的状态。

    • 执行方式:协程不能直接运行,需要通过 awaitasyncio.run() 来启动其执行。

  2. Awaitable

    • 定义:如果一个对象可以在 await 表达式中使用,那么它就是可等待对象(Awaitable)。

    • 状态:Awaitable 本身不直接表示状态,但它可以是 asyncio.Futureasyncio.Task,从而间接具有相应的状态。

    • 执行方式:Awaitable 通过 await 表达式来等待其完成,并获取结果。

  3. 任务

    • 定义:任务(asyncio.Task)是一个类,用于表示异步操作的执行单元。

    • 状态:任务有四种状态:Pending(待处理)、Running(运行中)、Done(已完成)和 Cancelled(已取消)。

    • 执行方式:任务通过 asyncio.create_task() 函数创建,并加入到事件循环中等待被调度执行。

  4. Future

    • 定义asyncio.Future 表示将来可能完成的操作结果。

    • 状态asyncio.Future 有三种主要状态:PENDING(挂起)、CANCELLED(取消)和 FINISHED(完成)。

    • 执行方式asyncio.Future 通常由底层库或框架创建和使用,不需要用户直接创建。