任务#
在线程池中执行代码#
awaitable loop.run_in_executor(executor, func, *args)
安排在指定的执行器
executor
中调用func(*args)
。
import asyncio
import functools
import concurrent.futures
import time
# 定义用于阻塞 IO 操作的函数
def block_io_operation(seconds):
import time
time.sleep(seconds)
return f"IO 操作已完成,共阻塞了 {seconds} 秒"
# 使用 asyncio 创建异步协程
async def async_operation():
print(f"开始异步操作||{time.asctime()}")
loop = asyncio.get_running_loop()
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 在线程池中运行阻塞IO操作
task_call = functools.partial(block_io_operation, 2)
result = await loop.run_in_executor(executor, task_call)
print(f"{result}||{time.asctime()}")
# 在当前线程中运行异步协程
# asyncio.run(async_operation())
await async_operation()
开始异步操作||Mon Nov 11 00:12:49 2024
IO 操作已完成,共阻塞了 2 秒||Mon Nov 11 00:12:51 2024
#
跨线程调度#
构建 GUI 的框架通常会运行自己的事件循环,而事件循环会阻塞主线程,这意味着任何长时间运行的操作都可能导致用户界面挂起。此外,这个 UI 事件循环也会阻止创建异步事件循环。
下面我们来学习如何使用多线程执行多个事件循环。
asyncio
专门实现 Concurrency and Multithreading(多线程和并发)的函数:
loop.call_soon_threadsafe(callback, *args)
: 很底层的 API 接口,一般很少使用,本文也暂时不做讨论。asyncio.run_coroutine_threadsafe(coroutine,loop)
asyncio.run_coroutine_threadsafe(coro, loop)
向指定事件循环提交协程。(线程安全)
coro
表示需要异步执行的协程函数loop
表示在新线程中创建的事件循环
返回
concurrent.futures.Future
以等待来自其他 OS 线程的结果。
asyncio.run_coroutine_threadsafe()
函数应该从另一个 OS 线程中调用,而非事件循环运行所在线程。示例:
import asyncio
import time
import threading
#需要执行的耗时异步任务
async def func(num):
print(f'准备调用 {func}, 大约耗时 {num} {time.asctime()}')
await asyncio.sleep(num)
print(f'耗时{num}之后, {func} 函数运行结束 {time.asctime()}')
#定义专门创建事件循环 loop 的函数,在另一个线程中启动它
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
#定义main函数
def main():
coroutine1 = func(3)
coroutine2 = func(2)
coroutine3 = func(1)
new_loop = asyncio.new_event_loop() #在当前线程下创建时间循环,(未启用),在 start_loop 里面启动它
t = threading.Thread(target=start_loop, args=(new_loop,)) #通过当前线程开启新的线程去启动事件循环
t.start()
asyncio.run_coroutine_threadsafe(coroutine1, new_loop) #这几个是关键,代表在新线程中事件循环不断“游走”执行
asyncio.run_coroutine_threadsafe(coroutine2, new_loop)
asyncio.run_coroutine_threadsafe(coroutine3, new_loop)
for i in "iloveu":
print(f"{i} {time.asctime()}")
main()
await asyncio.sleep(6) # 保证 ipykernel 单元格线程耗尽
准备调用 <function func at 0x7f9864640900>, 大约耗时 3 Mon Nov 11 00:12:51 2024i Mon Nov 11 00:12:51 2024
l Mon Nov 11 00:12:51 2024
o Mon Nov 11 00:12:51 2024
v Mon Nov 11 00:12:51 2024
e Mon Nov 11 00:12:51 2024
u Mon Nov 11 00:12:51 2024
准备调用 <function func at 0x7f9864640900>, 大约耗时 2 Mon Nov 11 00:12:51 2024
准备调用 <function func at 0x7f9864640900>, 大约耗时 1 Mon Nov 11 00:12:51 2024
耗时1之后, <function func at 0x7f9864640900> 函数运行结束 Mon Nov 11 00:12:52 2024
耗时2之后, <function func at 0x7f9864640900> 函数运行结束 Mon Nov 11 00:12:53 2024
耗时3之后, <function func at 0x7f9864640900> 函数运行结束 Mon Nov 11 00:12:54 2024
亦可跨线程调度协程的返回值:
import asyncio
import time
import threading
# 构建 coroutine
async def coro(num):
print(f'准备 {time.asctime()}')
res = await asyncio.sleep(1, result=num*2)
print(f'函数运行结束 {time.asctime()}')
return res
#定义专门创建事件循环 loop 的函数,在另一个线程中启动它
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
# 为给定的 loop 提交 coro
new_loop = asyncio.new_event_loop()
future = asyncio.run_coroutine_threadsafe(coro(3), new_loop) # 在当前线程下创建时间循环,(未启用)
t = threading.Thread(target=start_loop, args=(new_loop,)) #通过当前线程开启新的线程去启动事件循环
t.start()
# 等待结果,可选的超时参数
assert future.result(timeout=None) == 6
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
准备 Mon Nov 11 00:12:57 2024
函数运行结束 Mon Nov 11 00:12:58 2024
可以提供异常处理:
timeout = 2
future = asyncio.run_coroutine_threadsafe(coro(3), new_loop) # 在当前线程下创建时间循环,(未启用)
try:
result = future.result(timeout)
except TimeoutError:
future.cancel()
print("协程执行超时,任务已被取消...")
except Exception as exc:
print(f'协程引发了异常: {exc!r}')
else:
print(f'协程返回: {result!r}')
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
准备 Mon Nov 11 00:13:01 2024
函数运行结束 Mon Nov 11 00:13:02 2024
协程返回: 6
如果在协程内产生了异常,将会通知返回的 Future
对象。它也可被用来取消事件循环中的任务:
timeout = 1
future = asyncio.run_coroutine_threadsafe(coro(3), new_loop) # 在当前线程下创建时间循环,(未启用)
try:
result = future.result(timeout)
except TimeoutError:
future.cancel()
print("协程执行超时,任务已被取消...")
except Exception as exc:
print(f'协程引发了异常: {exc!r}')
else:
print(f'协程返回: {result!r}')
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
准备 Mon Nov 11 00:13:05 2024
协程执行超时,任务已被取消...
timeout = 2
future = asyncio.run_coroutine_threadsafe(coro(coro), new_loop) # 在当前线程下创建时间循环,(未启用)
try:
result = future.result(timeout)
except TimeoutError:
future.cancel()
print("协程执行超时,任务已被取消...")
except Exception as exc:
print(f'协程引发了异常: {exc!r}')
else:
print(f'协程返回: {result!r}')
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
准备 Mon Nov 11 00:13:09 2024
协程引发了异常: TypeError("unsupported operand type(s) for *: 'function' and 'int'")
简化跨线程调度#
# import functools
from contextlib import ContextDecorator
from dataclasses import dataclass
from typing import Coroutine
import asyncio
from asyncio.events import AbstractEventLoop
import threading
import time
@dataclass
class Scheduler(ContextDecorator):
coro: Coroutine
loop: AbstractEventLoop
def __post_init__(self):
# 初始化
self.thread = None
self.future = None
def __call__(self):
if self.thread is None or not self.thread.is_alive():
self.thread = threading.Thread(
target=self.start_loop,
daemon=True, # 保证设置子线程为守护线程,当主线程结束时,守护线程会自动结束
) #通过当前线程开启新的线程去启动事件循环
self.thread.start()
# 在新线程中事件循环不断“游走”执行
self.future = asyncio.run_coroutine_threadsafe(self.coro, self.loop)
def start_loop(self):
"""定义专门创建事件循环 loop 的函数,在另一个线程中启动它"""
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def __enter__(self):
print(f'启动:{self.coro}@{self.loop}')
return self
def __exit__(self, *exc):
print(f'离开:{self.coro}@{self.loop}')
return False
async def async_task(num):
print(f'准备调用, {time.asctime()}')
res = await asyncio.sleep(1, result=num*2)
print(f'函数运行结束 {time.asctime()}')
return res
new_loop = asyncio.new_event_loop()
scheduler = Scheduler(async_task(3), new_loop)
scheduler()
scheduler.future.result()
await asyncio.sleep(1) # 保证 ipykernel 单元格线程耗尽
准备调用, Mon Nov 11 00:13:12 2024
函数运行结束 Mon Nov 11 00:13:13 2024
for num in range(5):
new_loop = asyncio.new_event_loop()
scheduler = Scheduler(async_task(num), new_loop)
with scheduler:
scheduler()
print(f"结果:{scheduler.future.result()}")
await asyncio.sleep(5) # 保证 ipykernel 单元格线程耗尽
启动:<coroutine object async_task at 0x7f986467c120>@<_UnixSelectorEventLoop running=False closed=False debug=False>
离开:<coroutine object async_task at 0x7f986467c120>@<_UnixSelectorEventLoop running=True closed=False debug=False>
准备调用, Mon Nov 11 00:13:14 2024
函数运行结束 Mon Nov 11 00:13:15 2024
结果:0
启动:<coroutine object async_task at 0x7f986467c4a0>@<_UnixSelectorEventLoop running=False closed=False debug=False>
离开:<coroutine object async_task at 0x7f986467c4a0>@<_UnixSelectorEventLoop running=True closed=False debug=False>
准备调用, Mon Nov 11 00:13:15 2024
函数运行结束 Mon Nov 11 00:13:16 2024
结果:2
启动:<coroutine object async_task at 0x7f986467c820>@<_UnixSelectorEventLoop running=False closed=False debug=False>
离开:<coroutine object async_task at 0x7f986467c820>@<_UnixSelectorEventLoop running=True closed=False debug=False>
准备调用, Mon Nov 11 00:13:16 2024
函数运行结束 Mon Nov 11 00:13:17 2024
结果:4
启动:<coroutine object async_task at 0x7f986467cba0>@<_UnixSelectorEventLoop running=False closed=False debug=False>
离开:<coroutine object async_task at 0x7f986467cba0>@<_UnixSelectorEventLoop running=True closed=False debug=False>
准备调用, Mon Nov 11 00:13:17 2024
函数运行结束 Mon Nov 11 00:13:18 2024
结果:6
启动:<coroutine object async_task at 0x7f986467cf20>@<_UnixSelectorEventLoop running=False closed=False debug=False>
离开:<coroutine object async_task at 0x7f986467cf20>@<_UnixSelectorEventLoop running=True closed=False debug=False>
准备调用, Mon Nov 11 00:13:18 2024
函数运行结束 Mon Nov 11 00:13:19 2024
结果:8
在 tkinter
中调度异步任务#
from tkinter import ttk, Tk, StringVar
import time
class Window(Tk):
def __init__(self, **kw):
super().__init__(**kw)
self.download_button = ttk.Button(self, text="开始下载", command=self.start_download_thread)
self.result_label = ttk.Label(self, text="")
self.text_var = StringVar(self)
self.entry = ttk.Entry(self, textvariable=self.text_var)
self.label = ttk.Label(self, textvariable=self.text_var)
self.text_var.trace_add("write", self.update_text())
self.download_button.grid(row=0, column=0)
self.result_label.grid(row=1, column=0)
self.entry.grid(row=0, column=1)
self.label.grid(row=1, column=1)
def start_download_thread(self):
new_loop = asyncio.new_event_loop()
scheduler = Scheduler(self.download(), new_loop)
scheduler()
async def download(self):
# 模拟下载任务,这里可以替换为实际的下载操作
for i in range(1, 6):
self.result_label.config(text=f"下载中... ({i}/5)")
self.update() # 更新主界面以显示下载进度
time.sleep(2) # 模拟下载延迟
self.result_label.config(text="下载完成")
def update_text(self, *args):
def wrap(*args):
self.text_var.set(self.entry.get())
return wrap
root = Window()
root.title("多线程示例")
root.mainloop()