任务#

在线程池中执行代码#

awaitable loop.run_in_executor(executor, func, *args)

安排在指定的执行器 executor 中调用 func(*args)

import asyncio
import functools
import concurrent.futures
import time

# 定义用于阻塞 IO 操作的函数
def block_io_operation(seconds):
    import time
    time.sleep(seconds)
    return f"IO 操作已完成,共阻塞了 {seconds} 秒"

# 使用 asyncio 创建异步协程
async def async_operation():
    print(f"开始异步操作||{time.asctime()}")
    loop = asyncio.get_running_loop()
    # 创建线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # 在线程池中运行阻塞IO操作
        task_call = functools.partial(block_io_operation, 2)
        result = await loop.run_in_executor(executor, task_call)
    print(f"{result}||{time.asctime()}")

# 在当前线程中运行异步协程
# asyncio.run(async_operation())
await async_operation()
开始异步操作||Mon Nov 11 00:12:49 2024
IO 操作已完成,共阻塞了 2 秒||Mon Nov 11 00:12:51 2024

#

跨线程调度#

构建 GUI 的框架通常会运行自己的事件循环,而事件循环会阻塞主线程,这意味着任何长时间运行的操作都可能导致用户界面挂起。此外,这个 UI 事件循环也会阻止创建异步事件循环。

下面我们来学习如何使用多线程执行多个事件循环。

asyncio 专门实现 Concurrency and Multithreading(多线程和并发)的函数:

  1. loop.call_soon_threadsafe(callback, *args): 很底层的 API 接口,一般很少使用,本文也暂时不做讨论。

  2. asyncio.run_coroutine_threadsafe(coroutine,loop)

asyncio.run_coroutine_threadsafe(coro, loop)

向指定事件循环提交协程。(线程安全)

  • coro 表示需要异步执行的协程函数

  • loop 表示在新线程中创建的事件循环

返回 concurrent.futures.Future 以等待来自其他 OS 线程的结果。

asyncio.run_coroutine_threadsafe() 函数应该从另一个 OS 线程中调用,而非事件循环运行所在线程。示例:

import asyncio 
import time
import threading

#需要执行的耗时异步任务
async def func(num):
    print(f'准备调用 {func}, 大约耗时 {num} {time.asctime()}')
    await asyncio.sleep(num)
    print(f'耗时{num}之后, {func} 函数运行结束 {time.asctime()}')

#定义专门创建事件循环 loop 的函数,在另一个线程中启动它
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

#定义main函数
def main():
    coroutine1 = func(3)
    coroutine2 = func(2)
    coroutine3 = func(1)
 
    new_loop = asyncio.new_event_loop()                        #在当前线程下创建时间循环,(未启用),在 start_loop 里面启动它
    t = threading.Thread(target=start_loop, args=(new_loop,))   #通过当前线程开启新的线程去启动事件循环
    t.start()
 
    asyncio.run_coroutine_threadsafe(coroutine1, new_loop)  #这几个是关键,代表在新线程中事件循环不断“游走”执行
    asyncio.run_coroutine_threadsafe(coroutine2, new_loop)
    asyncio.run_coroutine_threadsafe(coroutine3, new_loop)
 
    for i in "iloveu":
        print(f"{i}    {time.asctime()}")
 
main()
await asyncio.sleep(6) # 保证 ipykernel 单元格线程耗尽
准备调用 <function func at 0x7f9864640900>, 大约耗时 3 Mon Nov 11 00:12:51 2024i    Mon Nov 11 00:12:51 2024
l    Mon Nov 11 00:12:51 2024
o    Mon Nov 11 00:12:51 2024
v    Mon Nov 11 00:12:51 2024
e    Mon Nov 11 00:12:51 2024
u    Mon Nov 11 00:12:51 2024

准备调用 <function func at 0x7f9864640900>, 大约耗时 2 Mon Nov 11 00:12:51 2024
准备调用 <function func at 0x7f9864640900>, 大约耗时 1 Mon Nov 11 00:12:51 2024
耗时1之后, <function func at 0x7f9864640900> 函数运行结束 Mon Nov 11 00:12:52 2024
耗时2之后, <function func at 0x7f9864640900> 函数运行结束 Mon Nov 11 00:12:53 2024
耗时3之后, <function func at 0x7f9864640900> 函数运行结束 Mon Nov 11 00:12:54 2024

亦可跨线程调度协程的返回值:

import asyncio 
import time
import threading

# 构建 coroutine
async def coro(num):
    print(f'准备 {time.asctime()}')
    res = await asyncio.sleep(1, result=num*2)
    print(f'函数运行结束 {time.asctime()}')
    return res

#定义专门创建事件循环 loop 的函数,在另一个线程中启动它
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

# 为给定的 loop 提交 coro
new_loop = asyncio.new_event_loop()
future = asyncio.run_coroutine_threadsafe(coro(3), new_loop) # 在当前线程下创建时间循环,(未启用)
t = threading.Thread(target=start_loop, args=(new_loop,))   #通过当前线程开启新的线程去启动事件循环
t.start()
# 等待结果,可选的超时参数
assert future.result(timeout=None) == 6
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
准备 Mon Nov 11 00:12:57 2024
函数运行结束 Mon Nov 11 00:12:58 2024

可以提供异常处理:

timeout = 2
future = asyncio.run_coroutine_threadsafe(coro(3), new_loop) # 在当前线程下创建时间循环,(未启用)
try:
    result = future.result(timeout)
except TimeoutError:
    future.cancel()
    print("协程执行超时,任务已被取消...")
except Exception as exc:
    print(f'协程引发了异常: {exc!r}')
else:
    print(f'协程返回: {result!r}')
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
准备 Mon Nov 11 00:13:01 2024
函数运行结束 Mon Nov 11 00:13:02 2024
协程返回: 6

如果在协程内产生了异常,将会通知返回的 Future 对象。它也可被用来取消事件循环中的任务:

timeout = 1
future = asyncio.run_coroutine_threadsafe(coro(3), new_loop) # 在当前线程下创建时间循环,(未启用)
try:
    result = future.result(timeout)
except TimeoutError:
    future.cancel()
    print("协程执行超时,任务已被取消...")
except Exception as exc:
    print(f'协程引发了异常: {exc!r}')
else:
    print(f'协程返回: {result!r}')
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
准备 Mon Nov 11 00:13:05 2024
协程执行超时,任务已被取消...
timeout = 2
future = asyncio.run_coroutine_threadsafe(coro(coro), new_loop) # 在当前线程下创建时间循环,(未启用)
try:
    result = future.result(timeout)
except TimeoutError:
    future.cancel()
    print("协程执行超时,任务已被取消...")
except Exception as exc:
    print(f'协程引发了异常: {exc!r}')
else:
    print(f'协程返回: {result!r}')
await asyncio.sleep(3) # 保证 ipykernel 单元格线程耗尽
准备 Mon Nov 11 00:13:09 2024
协程引发了异常: TypeError("unsupported operand type(s) for *: 'function' and 'int'")

简化跨线程调度#

# import functools
from contextlib import ContextDecorator
from dataclasses import dataclass
from typing import Coroutine
import asyncio
from asyncio.events import AbstractEventLoop
import threading
import time

@dataclass
class Scheduler(ContextDecorator):
    coro: Coroutine
    loop: AbstractEventLoop

    def __post_init__(self):
        # 初始化
        self.thread = None
        self.future = None

    def __call__(self):
        if self.thread is None or not self.thread.is_alive():
            self.thread = threading.Thread(
                target=self.start_loop, 
                daemon=True, # 保证设置子线程为守护线程,当主线程结束时,守护线程会自动结束
            )   #通过当前线程开启新的线程去启动事件循环
            self.thread.start()
            # 在新线程中事件循环不断“游走”执行
            self.future = asyncio.run_coroutine_threadsafe(self.coro, self.loop)

    def start_loop(self):
        """定义专门创建事件循环 loop 的函数,在另一个线程中启动它"""
        asyncio.set_event_loop(self.loop)
        self.loop.run_forever()

    def __enter__(self):
        print(f'启动:{self.coro}@{self.loop}')
        return self

    def __exit__(self, *exc):
        print(f'离开:{self.coro}@{self.loop}')
        return False
async def async_task(num):
    print(f'准备调用, {time.asctime()}')
    res = await asyncio.sleep(1, result=num*2)
    print(f'函数运行结束 {time.asctime()}')
    return res

new_loop = asyncio.new_event_loop()
scheduler = Scheduler(async_task(3), new_loop)
scheduler()
scheduler.future.result()
await asyncio.sleep(1) # 保证 ipykernel 单元格线程耗尽
准备调用, Mon Nov 11 00:13:12 2024
函数运行结束 Mon Nov 11 00:13:13 2024
for num in range(5):
    new_loop = asyncio.new_event_loop()
    scheduler = Scheduler(async_task(num), new_loop)
    with scheduler:
        scheduler()
    print(f"结果:{scheduler.future.result()}")
await asyncio.sleep(5) # 保证 ipykernel 单元格线程耗尽
启动:<coroutine object async_task at 0x7f986467c120>@<_UnixSelectorEventLoop running=False closed=False debug=False>
离开:<coroutine object async_task at 0x7f986467c120>@<_UnixSelectorEventLoop running=True closed=False debug=False>
准备调用, Mon Nov 11 00:13:14 2024
函数运行结束 Mon Nov 11 00:13:15 2024
结果:0
启动:<coroutine object async_task at 0x7f986467c4a0>@<_UnixSelectorEventLoop running=False closed=False debug=False>
离开:<coroutine object async_task at 0x7f986467c4a0>@<_UnixSelectorEventLoop running=True closed=False debug=False>
准备调用, Mon Nov 11 00:13:15 2024
函数运行结束 Mon Nov 11 00:13:16 2024
结果:2
启动:<coroutine object async_task at 0x7f986467c820>@<_UnixSelectorEventLoop running=False closed=False debug=False>
离开:<coroutine object async_task at 0x7f986467c820>@<_UnixSelectorEventLoop running=True closed=False debug=False>
准备调用, Mon Nov 11 00:13:16 2024
函数运行结束 Mon Nov 11 00:13:17 2024
结果:4
启动:<coroutine object async_task at 0x7f986467cba0>@<_UnixSelectorEventLoop running=False closed=False debug=False>
离开:<coroutine object async_task at 0x7f986467cba0>@<_UnixSelectorEventLoop running=True closed=False debug=False>
准备调用, Mon Nov 11 00:13:17 2024
函数运行结束 Mon Nov 11 00:13:18 2024
结果:6
启动:<coroutine object async_task at 0x7f986467cf20>@<_UnixSelectorEventLoop running=False closed=False debug=False>
离开:<coroutine object async_task at 0x7f986467cf20>@<_UnixSelectorEventLoop running=True closed=False debug=False>
准备调用, Mon Nov 11 00:13:18 2024
函数运行结束 Mon Nov 11 00:13:19 2024
结果:8

tkinter 中调度异步任务#

from tkinter import ttk, Tk, StringVar
import time

class Window(Tk):
    def __init__(self, **kw):
        super().__init__(**kw)
        self.download_button = ttk.Button(self, text="开始下载", command=self.start_download_thread)
        self.result_label = ttk.Label(self, text="")
        self.text_var = StringVar(self)
        self.entry = ttk.Entry(self, textvariable=self.text_var)
        self.label = ttk.Label(self, textvariable=self.text_var)
        self.text_var.trace_add("write", self.update_text())
        self.download_button.grid(row=0, column=0)
        self.result_label.grid(row=1, column=0)
        self.entry.grid(row=0, column=1)
        self.label.grid(row=1, column=1)
        
    def start_download_thread(self):
        new_loop = asyncio.new_event_loop()
        scheduler = Scheduler(self.download() new_loop)
        scheduler()

    async def download(self):
        # 模拟下载任务,这里可以替换为实际的下载操作
        for i in range(1, 6):
            self.result_label.config(text=f"下载中... ({i}/5)")
            self.update()  # 更新主界面以显示下载进度
            time.sleep(2)  # 模拟下载延迟
        self.result_label.config(text="下载完成")

    def update_text(self, *args):
        def wrap(*args):
            self.text_var.set(self.entry.get())
        return wrap

root = Window()
root.title("多线程示例")
root.mainloop()