异步任务#
参考:《asyncio 系列》1. 什么是 asyncio?如何基于单线程实现并发?事件循环又是怎么工作的?
先了解两个概念:
备注
IO密集型是指那些主要受限于输入/输出操作的任务。
基本概述
定义特点:IO密集型程序的特点是需要频繁地与外部设备(如磁盘、网络、键盘、鼠标等)进行数据交换,而相对较少的计算操作。
性能瓶颈:IO密集型任务的性能瓶颈通常在于I/O设备的带宽和响应时间。
典型应用
数据库管理系统:例如SQL查询、数据插入、数据更新等。
Web服务器:例如处理HTTP请求、响应客户端、传输数据等。
文件系统:例如文件读写、文件同步、文件备份等。
实时通信:例如聊天应用、在线游戏、远程桌面等。
优化方法
使用高性能I/O设备:例如更快的磁盘、更大的内存、更高带宽的网络等。
采用I/O多路复用技术:通过使用select、poll、epoll等技术,同时处理多个I/O操作,提高I/O设备的利用率。
采用异步I/O技术:避免阻塞式I/O操作,提高程序的响应性能。
利用缓存和预取技术:减少不必要的I/O操作,提高数据访问速度。
线程选择
多线程策略:对于IO密集型任务,由于CPU在等待I/O操作时处于空闲状态,因此可以通过增加线程数来提高CPU的利用率。
线程数计算公式:一般遵循I/O密集型核心线程数 = CPU核数 / (1-阻塞系数),其中阻塞系数在0到1范围内,一般为0.8~0.9之间。
注意事项
线程数限制:虽然增加线程数可以提高CPU的利用率,但并不是线程数越多越好。过多的线程会导致线程切换和资源竞争的开销增加,反而会降低系统的性能。
具体业务调整:在实际项目中,需要根据具体业务情况和硬件环境来选择合适的线程数。
总的来说,IO密集型任务是计算机系统中一类重要的任务类型,其特点是频繁的I/O操作和相对较低的CPU使用率。通过合理的优化方法和线程选择策略,可以显著提高IO密集型任务的性能和响应能力。
备注
CPU密集型指的是系统的硬盘、内存性能相对CPU要好很多,此时系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,因此CPU负载长期过高。以下是关于CPU密集型的详细介绍:
基本概述
定义特点:CPU密集型任务是指主要消耗CPU资源的任务,通常需要大量的计算和逻辑处理。
性能瓶颈:CPU密集型任务的性能瓶颈在于CPU的处理能力,而不是I/O设备的带宽和响应时间。
典型应用
数值计算:例如科学计算、数据分析等。
图形图像处理:例如视频解码、图像处理等。
机器学习:例如模型训练、推理等。
游戏开发:例如游戏物理模拟、AI计算等。
优化方法
使用多线程或多进程:通过并行化计算,提高CPU的利用率。
优化算法和数据结构:减少不必要的计算和内存访问,提高程序的运行效率。
利用硬件加速:例如使用GPU进行并行计算。
线程选择
多线程策略:对于CPU密集型任务,由于CPU在等待I/O操作时处于空闲状态,因此可以通过增加线程数来提高CPU的利用率。
线程数计算公式:一般遵循CPU密集型核心线程数 = CPU核数 + 1。
总的来说,CPU密集型任务是计算机系统中一类重要的任务类型,其特点是需要大量的CPU资源进行计算和逻辑处理。通过合理的优化方法和线程选择策略,可以显著提高CPU密集型任务的性能和响应能力。
from d2py.utils.file import mkdir
temp_dir = ".temp" # 缓存目录
mkdir(temp_dir, parents=True, exist_ok=True)
比如任务:
import httpx
res = httpx.get("http://www.bing.com") # IO 密集型(web 请求)
items = res.headers.items()
headers = [f"{key}: {val}" for key, val in items] # CPU 密集型(响应处理)
formatted_headers = "\n".join(headers) # CPU 密集型(字符串连接)
with open(f"{temp_dir}/headers.txt", "w", encoding="utf-8") as f:
f.write(formatted_headers) # IO 密集型(磁盘写入)
异步任务允许在执行任务时暂停特定程序的执行,可在后台等待初始任务完成时运行其他代码。这允许同时执行许多任务,从而潜在地加快应用程序的运行速度。
备注
asyncio
使用协同多任务来实现并发性,当应用程序达到可以等待一段时间以返回结果的时间点时,在代码中显式地标记它,并让其它代码执行。一旦标记的任务完成,应用程序就"醒来"并继续执行该任务。这是一种并发形式,因为可同时启动多个任务,但重要的是,这不是并行模式,因为它们不会同时执行代码。
进程
操作系统分配资源的最小单元。
线程
操作系统用来调度 CPU 的最小单元。
进程好比一个房子,而线程是房子里面干活的人,所以一个进程里面可以有多个线程,线程共享进程里面的资源。因此真正用来工作的是线程,进程只负责提供相应的内存空间和资源。
进程
进程是具有其它应用程序无法访问的内存空间的应用程序运行状态,创建 Python 进程的例子:运行简单的 “hello world” 应用程序,或在命令行输入 Python 来启动 REPL(交互式环境)。
多个进程可以在一台机器上运行,如果有一台拥有多核 CPU 的机器,就可以同时执行多个进程。在只有一个核的 CPU 上,仍可通过时间片,同时运行多个应用程序。当操作系统使用时间片时,它会在一段时间后自动切换下一个进程并运行它。确定何时发生此切换的算法因操作系统而异。
线程
线程可以被认为是轻量级进程,此外线程是操作系统可以管理的最小结构,它们不像进程那样有自己的内存空间,相反,它们共享进程的内存。线程与创建它们的进程关联,一个进程总是至少有一个与之关联的线程,通常称为主线程。一个进程还可以创建其他线程,通常称为工作线程或后台线程,这些线程可与主线程同时执行其他工作。线程很像进程,可以在多核 CPU 上并行运行,操作系统也可通过时间片在它们之间切换。当运行一个普通的 Python 应用程序时,会创建一个进程以及一个负责执行具体代码的主线程。
import os
import threading
print(f"进程启动,pid 为 {os.getpid()}")
print(f"该进程内部运行 {threading.active_count()} 个线程")
print(f"当前正在运行的线程是 {threading.current_thread().name}")
进程启动,pid 为 7831
该进程内部运行 7 个线程
当前正在运行的线程是 MainThread
进程还可创建新的线程,这些线程可通过所谓的多线程技术同时完成其他工作,并共享进程的内存。
import threading
def a_task():
# print('任务名称:', __name__)
print('父进程 id:', os.getppid())
print('当前进程 id:', os.getpid())
print(f"当前线程:{threading.current_thread().name}")
thread = threading.Thread(target=a_task, name="task a")
thread.start()
print(f"该进程内部运行 {threading.active_count()} 个线程")
print(f"当前正在运行的线程是 {threading.current_thread().name}")
thread.join()
父进程 id: 2541
当前进程 id: 7831
当前线程:task a
该进程内部运行 7 个线程
当前正在运行的线程是 MainThread
也可以使用多进程的方式执行此任务:
import multiprocessing as mp
# 在 Windows 上必须加上 if __name__ == '__main__'
# 否则多进程乎启动失败
if __name__ == '__main__':
process = mp.Process(target=a_task)
process.start()
process.join()
父进程 id:
7831
当前进程 id:
7849
当前线程:MainThread
下面介绍使用协程的方式实现并发任务。
async def a_task():
print('父进程 id:', os.getppid())
print('当前进程 id:', os.getpid())
print(f"当前线程:{threading.current_thread().name}")
async
把函数 main()
变成 Coroutine
类型,即协程对象。
a_task()
<coroutine object a_task at 0x7f17e8f19080>
import asyncio
f = a_task()
print(f"{f} 为协程 {asyncio.iscoroutine(f)}")
f.close() # 防止资源警告
<coroutine object a_task at 0x7f17e88a31d0> 为协程 True
执行协程,需要在 事件循环 中显式执行它。
可以使用便捷函数 asyncio.run()
运行事件循环并执行协程任务:
import asyncio
asyncio.run(a_task())
在 Jupyter Notebook 环境中,可以直接在单元格使用如下方式执行(因为它本身已经启动了事件循环):
await a_task()
父进程 id: 2541
当前进程 id: 7831
当前线程:MainThread
await
表达式,挂起了协程对象的执行以等待 awaitable
对象。
asyncio
的真正优势是能暂停执行,让事件循环在长时间运行期间,运行其他任务。要暂停执行,使用 await
关键字,await
关键字之后通常会调用协程(更具体地说是被称为 awaitable
的对象,可以是协程或者有 __await__()
方法的对象)。
使用 await
关键字将导致它后面的协程运行,这与直接调用协程不同,因为直接调用只会产生一个协程对象。await
表达式也会暂停它所在的协程,直到等待的协程完成并返回结果。等待的协程完成时,将访问它返回的结果,并唤醒 await
所在的协程。
import asyncio
import time
import threading
async def main(name):
time.time()
print(f"欢迎 {name} 进入 {threading.current_thread()}||{time.asctime()}")
await asyncio.sleep(2) # 阻塞 2 秒,用于模拟长时任务
print(f"再次进入 {threading.current_thread()}||{time.asctime()}")
return name
await main("小明")
欢迎 小明 进入 <_MainThread(MainThread, started 139741054942080)>||Mon Nov 18 08:41:38 2024
再次进入 <_MainThread(MainThread, started 139741054942080)>||Mon Nov 18 08:41:40 2024
'小明'
可以看到,前后两次打印间隔 2s。
可以使用 asyncio.create_task()
函数用来并发运行作为 asyncio
asyncio.Task
的多个协程。
import time
import asyncio
async def delay(seconds):
print(f"开始休眠 {seconds} 秒||{time.ctime()}")
await asyncio.sleep(seconds)
print(f"休眠完成||{time.ctime()}")
return seconds
直接执行 await delay(3)
,那么在处理其他事情前需要至少等待 3 秒:
res = await delay(3)
print(f"其他事情||{time.ctime()}")
开始休眠 3 秒||Mon Nov 18 08:41:40 2024
休眠完成||Mon Nov 18 08:41:43 2024
其他事情||Mon Nov 18 08:41:43 2024
async def main():
print(f"开始任务: {time.ctime()}")
# 将 delay(3) 包装成任务,注:包装完之后直接就丢到事件循环里面运行了
# 因此这里会立即返回,而返回值是 asyncio.Task 对象
sleep_for_three = asyncio.create_task(delay(3))
print(f"其他事情||{time.ctime()}")
print(f"sleep_for_three: {sleep_for_three.__class__}||{time.ctime()}")
# 至于协程究竟有没有运行完毕,我们可以通过 Task 对象来查看
# 当协程运行完毕或者报错,都看做是运行完毕了,那么调用 Task 对象的 done 方法会返回 True
# 否则返回 False,由于代码是立即执行,还没有到 3 秒钟,因此打印结果为 False
print(f"协程(任务)是否执行完毕: {sleep_for_three.done()}||{time.ctime()}")
# 这里则保证必须等到 Task 对象里面的协程运行完毕后,才能往下执行
result = await sleep_for_three
print(f"协程(任务)是否执行完毕: {sleep_for_three.done()}||{time.ctime()}")
print(f"返回值: {result}")
await main()
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
开始任务: Mon Nov 18 08:41:43 2024
其他事情||Mon Nov 18 08:41:43 2024
sleep_for_three: <class '_asyncio.Task'>||Mon Nov 18 08:41:43 2024
协程(任务)是否执行完毕: False||Mon Nov 18 08:41:43 2024
开始休眠 3 秒||Mon Nov 18 08:41:43 2024
休眠完成||Mon Nov 18 08:41:46 2024
协程(任务)是否执行完毕: True||Mon Nov 18 08:41:46 2024
返回值: 3
可以执行多任务调度:
async def main():
print(f"开始任务: {time.ctime()}")
sleep_for_three = asyncio.create_task(delay(3))
sleep_again = asyncio.create_task(delay(3))
sleep_once_more = asyncio.create_task(delay(3))
await sleep_for_three
print(f"结束 sleep_for_three 任务: {time.ctime()}")
await sleep_again
print(f"结束 sleep_again 任务: {time.ctime()}")
await sleep_once_more
print(f"结束全部任务: {time.ctime()}")
await main()
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
开始任务: Mon Nov 18 08:41:49 2024
开始休眠 3 秒||Mon Nov 18 08:41:49 2024
开始休眠 3 秒||Mon Nov 18 08:41:49 2024
开始休眠 3 秒||Mon Nov 18 08:41:49 2024
休眠完成||Mon Nov 18 08:41:52 2024
休眠完成||Mon Nov 18 08:41:52 2024
休眠完成||Mon Nov 18 08:41:52 2024
结束 sleep_for_three 任务: Mon Nov 18 08:41:52 2024
结束 sleep_again 任务: Mon Nov 18 08:41:52 2024
结束全部任务: Mon Nov 18 08:41:52 2024
在上面的代码中启动了三个任务,每个任务需要 3 秒才能完成。但由于对 create_task
的每次调用都会立即返回,因此会立即到达 await sleep_for_three
语句,并且三个任务都丢到了事件循环,开启执行。由于 asyncio.sleep()
属于 IO,因此会进行切换,所以三个任务是并发执行的,这也意味着整个程序会在 3 秒钟左右完成,而不是 9 秒钟。
取消任务#
取消任务很简单,每个任务对象都有名为 asyncio.Task.cancel()
的方法,可以在想要停止任务时调用它。取消任务将导致该任务在执行 await
时引发 CancelledError
,然后再根据需要处理它
import asyncio
async def delay(seconds):
print(f"开始休眠 {seconds} 秒")
await asyncio.sleep(seconds)
print(f"休眠完成")
return seconds
async def main():
long_task = asyncio.create_task(delay(10))
seconds_elapsed = 0
while not long_task.done():
print(f"检测到任务尚未完成,一秒钟之后继续检测 || {time.ctime()}")
await asyncio.sleep(1)
seconds_elapsed += 1
# 时间超过 5 秒,取消任务
if seconds_elapsed == 5:
long_task.cancel()
try:
# 等待 long_task 完成,显然执行到这里的时候,任务已经被取消
# 不管是 await 一个已经取消的任务,还是 await 的时候任务被取消
# 都会引发 asyncio.CancelledError
await long_task
except asyncio.CancelledError:
print("任务被取消")
await main()
await asyncio.sleep(9) # 保证 ipykernel 单元格线程耗尽
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:41:55 2024
开始休眠 10 秒
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:41:56 2024
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:41:57 2024
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:41:58 2024
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:41:59 2024
检测到任务尚未完成,一秒钟之后继续检测 || Mon Nov 18 08:42:00 2024
任务被取消
小技巧
关于取消任务需要注意的是,CancelledError
只能从 await
语句抛出。这意味着如果在任务在执行普通 Python 代码时被取消,那么该代码将一直运行,直到触发下一个 await
语句(如果存在),才能引发 CancelledError
。
async def delay(seconds):
print(f"开始休眠 {seconds} 秒")
await asyncio.sleep(seconds)
print(f"休眠完成")
return seconds
async def main():
long_task = asyncio.create_task(delay(3))
# 立刻取消
long_task.cancel()
# 但 CancelledError 只有在 await 取消的协程时才会触发
# 所以下面的语句会正常执行
print(f"我会正常执行|| {time.ctime()}")
print("Hello World")
print(list(range(10)))
await asyncio.sleep(5)
try:
# 引发 CancelledError
await long_task
except asyncio.CancelledError:
print(f"任务被取消|| {time.ctime()}")
await main()
await asyncio.sleep(14) # 保证 ipykernel 单元格线程耗尽
我会正常执行|| Mon Nov 18 08:42:10 2024
Hello World
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
任务被取消|| Mon Nov 18 08:42:15 2024
设置超时并使用 asyncio.wait_for()
执行任务取消#
asyncio.wait_for()
函数接收协程或任务对象,以及以秒为单位的超时时间。如果任务完成所需的时间超过了设定的超时时间,则会引发 TimeoutException
,任务将自动取消。
import asyncio
async def delay(seconds):
print(f"开始休眠 {seconds} 秒|| {time.ctime()}")
await asyncio.sleep(seconds)
print(f"休眠完成|| {time.ctime()}")
return seconds
async def main():
delay_task = asyncio.create_task(delay(2))
try:
result = await asyncio.wait_for(delay_task, 1)
print("返回值:", result)
except asyncio.TimeoutError:
print(f"超时啦|| {time.ctime()}")
# delay_task.cancelled() 用于判断任务是否被取消
# 任务被取消:返回 True,没有被取消:返回 False
print(f"{time.ctime()}||任务是否被取消:", delay_task.cancelled())
await main()
await asyncio.sleep(1) # 保证 ipykernel 单元格线程耗尽
开始休眠 2 秒|| Mon Nov 18 08:42:29 2024
超时啦|| Mon Nov 18 08:42:30 2024
Mon Nov 18 08:42:30 2024||任务是否被取消: True
某任务花费的时间比预期的要长,但即便超过了规定的超时时间,也不取消该任务。为此,可使用 asyncio.shield()
函数包装任务,这个函数将防止传入的协程被取消,会给予它屏蔽,将取消请求将忽略掉。
import asyncio
import time
async def delay(seconds):
print(f"开始休眠 {seconds} 秒")
await asyncio.sleep(seconds)
print(f"休眠完成")
return seconds
async def main():
delay_task = asyncio.create_task(delay(2))
try:
# 通过 asyncio.shield 将 delay_task 保护起来
result = await asyncio.wait_for(asyncio.shield(delay_task), 1)
print(f"{time.ctime()}: 返回值:", result)
except asyncio.TimeoutError:
print(f"{time.ctime()}: 超时啦")
# 如果超时依旧会引发 TimeoutError,但和之前不同的是
# 此时任务不会被取消了,因为 asyncio.shield 会将取消请求忽略掉
print(f"{time.ctime()}: 任务是否被取消:", delay_task.cancelled())
# 从出现超时的地方,继续执行,并等待它完成
result = await delay_task
print(f"{time.ctime()}: 返回值:", result)
await main()
await asyncio.sleep(2) # 保证 ipykernel 单元格线程耗尽
开始休眠 2 秒
Mon Nov 18 08:42:32 2024: 超时啦
Mon Nov 18 08:42:32 2024: 任务是否被取消: False
休眠完成
Mon Nov 18 08:42:33 2024: 返回值: 2
任务、协程、future 和 awaitable#
任务、协程、Future 和 awaitable
是异步编程中的基本概念,它们在定义、状态以及执行方式等方面存在区别。以下是具体分析:
协程
定义:协程(Coroutine)是一种比线程更轻量级的并发执行单元,通过
async def
关键字定义。状态:协程对象本身没有状态,但当它被封装进
asyncio.Task
后,会继承asyncio.Task
的状态。执行方式:协程不能直接运行,需要通过
await
或asyncio.run()
来启动其执行。
Awaitable
定义:如果一个对象可以在
await
表达式中使用,那么它就是可等待对象(Awaitable)。状态:Awaitable 本身不直接表示状态,但它可以是
asyncio.Future
或asyncio.Task
,从而间接具有相应的状态。执行方式:Awaitable 通过
await
表达式来等待其完成,并获取结果。
任务
定义:任务(
asyncio.Task
)是一个类,用于表示异步操作的执行单元。状态:任务有四种状态:Pending(待处理)、Running(运行中)、Done(已完成)和 Cancelled(已取消)。
执行方式:任务通过
asyncio.create_task()
函数创建,并加入到事件循环中等待被调度执行。
Future
定义:
asyncio.Future
表示将来可能完成的操作结果。状态:
asyncio.Future
有三种主要状态:PENDING(挂起)、CANCELLED(取消)和 FINISHED(完成)。执行方式:
asyncio.Future
通常由底层库或框架创建和使用,不需要用户直接创建。