Future#

Future 包含你希望在未来某个时间点获得、但目前还不存在的值。通常,当创建 Future 时,它没有任何值,因为它还不存在。在这种状态下,它被认为是不完整的、未解决的或根本没有完成的。然后一旦你得到结果,就可以设置 future 的值,这将完成 future。那时,我们可以认为它已经完成,并可从 future 中提取结果。

import asyncio

future = asyncio.Future()
print(future)
print(future.__class__)
print(f"future 是否完成: {future.done()}")
<Future pending>
<class '_asyncio.Future'>
future 是否完成: False

set_result() 可以设置值:

future.set_result("道可道,非恒到。")
print(f"future 是否完成: {future.done()}")
print(future)
print(f"future 的返回值: {future.result()}")
future 是否完成: True
<Future finished result='道可道,非恒到。'>
future 的返回值: 道可道,非恒到。

必须在调用 set_result()(设置结果)之后才能调用 result()(获取结果),并且 set_result() 只能调用一次,但 result() 可以调用多次。

如果想在 future 中设置异常,可调用 set_exception()

class Future:
    # Future 实例有以下三个属性非常重要
    # _state: 运行状态,有三种,分别是 PENDING(正在运行)、CANCELLED(已取消)、FINISHED(已完成)
    # _result: future 完成之后的设置的结果
    # _exception: future 报错时设置的异常 
    
    def cancel(self):
        # cancel 方法,负责取消一个 future
        # 并且该方法有返回值,取消成功返回 True,取消失败返回 False
        self.__log_traceback = False
        # 检测状态是否为 PENDING,不是 PENDING,说明 future 已经运行完毕或取消了
        # 那么返回 False 表示取消失败,但对于 future 而言则无影响
        if self._state != _PENDING:
            return False
        # 如果状态是 PENDING,那么将其改为 CANCELLED
        self._state = _CANCELLED
        self.__schedule_callbacks()
        return True

    def cancelled(self):
        # 判断 future 是否被取消,那么检测它的状态是否为 CANCELLED 即可
        return self._state == _CANCELLED

    def done(self):
        # 判断 future 是否已经完成,那么检测它的状态是否不是 PENDING 即可
        # 注意:CANCELLED 和 FINISHED 都表示已完成
        return self._state != _PENDING

    def result(self):
        # 调用 result 方法相当于获取 future 设置的结果
        # 但如果它的状态为 CANCELLED,表示取消了,那么抛出 CancelledError
        # 正如同你 await 一个已取消的任务一样,因为 await 会阻塞任务并拿到它的执行结果
        # 如果任务已取消,同样抛出 CancelledError
        # 所以 future 和 task 是很相似的,因为 Task 本身就是 Future 的子类
        # 至于 Future 和 Task 具体的区别,我们一会儿再说
        if self._state == _CANCELLED:
            raise exceptions.CancelledError
        # 如果状态不是 FINISHED(说明还没有设置结果),那么抛出 asyncio.InvalidStateError 异常
        # 所以我们不能在 set_result 之前调用 result
        if self._state != _FINISHED:
            raise exceptions.InvalidStateError('Result is not ready.')
        self.__log_traceback = False
        # 走到这里说明状态为 FINISHED
        # 但不管是正常执行、还是出现异常,都会将状态标记为 FINISHED
        # 如果是出现异常,那么调用 result 会将异常抛出来
        if self._exception is not None:
            raise self._exception
        # 否则返回设置的结果
        return self._result

    def exception(self):
        # 无论是正常执行结束,还是出现异常,future 的状态都是已完成
        # 如果是正常执行结束,那么 self._result 就是结果,self._exception 为 None
        # 如果是出现异常,那么 self._result 为 None,self._exception 就是异常对象本身
        
        # 因此调用 result 和 exception 都要求 future 的状态为 FINISHED
        # 如果为 CANCELLED,那么同样抛出 CancelledError
        if self._state == _CANCELLED:
            raise exceptions.CancelledError
        # 如果为 PENDING,那么抛出 asyncio.InvalidStateError 异常
        if self._state != _FINISHED:
            raise exceptions.InvalidStateError('Exception is not set.')
        self.__log_traceback = False
        # 返回异常本身
        # 因此如果你不确定 future 到底是正常执行结束,还是抛了异常
        # 那么可以先调用 future.exception(),如果为 None,说明正常执行,再通过 future.result() 获取结果
        # 如果 future.exception() 不为 None,那么拿到的就是异常本身
        return self._exception

    def set_result(self, result):
        # 当 future 正常执行结束时,会通过 set_result 设置结果
        # 显然在设置结果的时候,future 的状态应该为 PENDING 
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        # 然后设置 self._result,当程序调用 future.result() 时会返回 self._result
        self._result = result
        # 并将状态标记为 FINISHED,表示一个任务从 PENDING 变成了 FINISHED
        # 所以我们不能对一个已完成的 future 再次调用 set_result
        # 因为第二次调用 set_result 的时候,状态已经不是 PENDING 了
        self._state = _FINISHED
        self.__schedule_callbacks()

    def set_exception(self, exception):
        # 和 set_result 类似,都表示任务从 PENDING 变成 FINISHED
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        # 但 exception 必须是异常,且不能是 StopIteration 异常
        if isinstance(exception, type):
            exception = exception()
        if type(exception) is StopIteration:
            raise TypeError("StopIteration interacts badly with generators "
                            "and cannot be raised into a Future")
        # 将 self._exception 设置为 exception
        # 调用 future.exception() 的时候,会返回 self._exception
        self._exception = exception
        # 将状态标记为已完成
        self._state = _FINISHED
        self.__schedule_callbacks()
        self.__log_traceback = True
import asyncio

future = asyncio.Future()
# future 是否已完成
print(future.done())  # False
print(future._state != "PENDING")  # False
print(future._state)  # PENDING

# 获取结果
try:
    future.result()
except asyncio.InvalidStateError:
    print("future 尚未完成,不能获取结果")
# 但是我们可以通过 future._result 去获取(不推荐)
# 显然拿到的是 None
print(future._result)  # None
print(future._exception)  # None

future.set_result("我是返回值")
print(future.done())  # True
print(future._state)  # FINISHED
print(future.result() == future._result == "我是返回值")  # True
False
False
PENDING
future 尚未完成,不能获取结果
None
None
True
FINISHED
True

future 也可以用在 await 表达式中,如果对 future 执行 await 操作,那么会处于阻塞,直到 future 有可供使用的值。这和 await 任务是类似的,当任务里面的协程 return 之后会解除阻塞,并拿到返回值。而 await future,那么当 future 有了值,await 同样会拿到它,并解除阻塞。

import asyncio
import time

async def set_future_value(future):
    await asyncio.sleep(1)
    future.set_result("Hello World")

def make_request():
    future = asyncio.Future()
    # 创建任务来异步设置 future 的值
    asyncio.create_task(set_future_value(future))
    return future

async def main():
    # 注意这里的 make_request,它是普通的函数,如果在外部直接调用肯定是会报错的
    # 因为没有事件循环,在执行 set_future_value 时会报错
    # 但如果在协程里面调用是没问题的,因为协程运行时,事件循环已经启动了
    # 此时在 make_request 里面,会启动任务
    future = make_request()
    print(f"future 是否完成: {future.done()}||{time.ctime()}")
    # 阻塞等待,直到 future 有值,什么时候有值呢?
    # 显然是当协程 set_future_value 里面执行完 future.set_result 的时候
    value = await future  # 暂停 main(),直到 future 的值被设置完成
    print(f"future 是否完成: {future.done()}||{time.ctime()}")
    print(value)
await main()
future 是否完成: False||Mon Nov 18 08:42:43 2024
future 是否完成: True||Mon Nov 18 08:42:44 2024
Hello World

在代码中我们定义了一个函数 make request,该函数里面创建了一个 future 和一个任务,该任务将在 1 秒后异步设置 future 的结果。然后在主函数中调用 make_request,当调用它时,将立即得到一个没有结果的 future。然后 await future 会让主协程陷入等待,并将执行权交出去。一旦当 future 有值了,那么再恢复 main() 协程,拿到返回值进行处理。

但在 asyncio 中,你应该很少主动创建 future。更多时候,你将遇到一些返回 future 的异步 API,并可能需要使用基于回调的代码。举个例子:

import asyncio

def callback(future):
    print(f"future 已完成,值为 {future.result()}")

async def main():
    future = asyncio.Future()
    # 绑定回调,当 future 有值时会自动触发回调的执行
    future.add_done_callback(callback)
    future.set_result("666")

await main()
future 已完成,值为 666

future 和任务之间有很密切的关系,事实上任务直接继承自 future,准确来说是 Task 继承自 Future。future 可以被认为代表了我们暂时不会拥有的值,而一个任务可以被认为是一个协程和一个 future 的组合。创建一个任务时,我们正在创建一个空的 future,并运行协程。然后当协程运行得到结果或出现异常时,我们将设置 future 的结果或异常。

所以 “await 任务” 什么时候结束,显然是当协程执行完毕并将返回值设置在 future 里面的时候。如果直接 await future,那么需要我们手动调用 future.set_result;如果 await 任务,那么当协程执行完毕时会自动调用 future.set_result(执行出错则自动调用 future.set_exception),因为任务是基于协程包装得到的,它等价于一个协程加上一个 future。

但不管 await 后面跟的是任务还是 future,本质上都是等到 future 里面有值之后,通过 future.result() 拿到里面的值。

所以当 await 任务的时候,如果任务执行出错了,那么会怎么样呢?首先出错了,那么任务里面的 future 会调用 set_exception 设置异常。而前面在看 future 源码的时候,我们知道:如果没有出现异常,那么调用 result 返回结果,调用 exception 会返回 None;如果出现异常,那么调用 exception 会返回异常,调用 result 会将异常抛出来。而 await 任务,本质上就是在调用内部 future 的 result 方法,显然如果任务执行出错,那么会将出错时产生的异常抛出来。

再来看看协程,任务、future、协程,三者都可以跟在 await 关键字后面,那么它们有没有什么共同之处呢?

很简单,它们之间的共同点是 awaitable 抽象基类,这个类定义了一个抽象的魔法函数 __await__,任何实现了 __await__ 方法的对象都可以在 await 表达式中使用。协程直接继承自 awaitable,future 也是如,而任务则是对 future 进行了扩展。

from functools import wraps
import time
from typing import Callable, Any

def async_timed(func: Callable) -> Callable:
    @wraps(func)
    async def wrapper(*args, **kwargs) -> Any:
        print(f"协程 {func.__name__} 开始执行")
        start = time.perf_counter()
        try:
            return await func(*args, **kwargs)
        finally:
            end = time.perf_counter()
            total = end - start
            print(f"协程 {func.__name__}{total} 秒执行完毕")
    return wrapper
@async_timed
async def delay(seconds):
    print(f"协程开始休眠 {seconds} 秒")
    await asyncio.sleep(seconds)
    print(f"{seconds} 秒后,协程结束休眠")
    return seconds

@async_timed
async def main():
    task_one = asyncio.create_task(delay(2))
    task_two = asyncio.create_task(delay(3))

    await task_one
    await task_two
    
await main()
协程 main 开始执行
协程 delay 开始执行
协程开始休眠 2 秒
协程 delay 开始执行
协程开始休眠 3 秒
2 秒后,协程结束休眠
协程 delay 用 2.002588799999785 秒执行完毕
3 秒后,协程结束休眠
协程 delay 用 3.001828549000038 秒执行完毕
协程 main 用 3.001952099999926 秒执行完毕

可以看到,两个 delay 调用分别需要大约 2 秒和 3 秒内才能完成,总共加起来是 5 秒。但是主协程只花了 3 秒就完成了,原因就是在等待期间使用了并发。

协程和任务的陷阱#

虽然通过将协程包装成任务来并发执行,可以获得一些性能改进,但有些场景下却得不到提升。

  • 第一个场景:代码是 CPU 密集;

  • 第二个场景:代码虽然是 IO 密集,但 IO 是阻塞 IO,而不是非阻塞 IO;

运行 CPU 密集型代码#

当有好几个执行大量计算的函数时,你获取会想到包装成任务并发执行。从概念上讲,这是一个好主意,但请记住 asyncio 使用的是单线程并发型,这意味着仍然受到单线程和全局解释器锁的限制。为证明这一点,让我们尝试同时运行多个 CPU 密集型函数。

import asyncio

@async_timed
async def cpu_bound_work():
    counter = 0
    for i in range(100000000):
        counter += 1
    return counter

@async_timed
async def main():
    task_one = asyncio.create_task(cpu_bound_work())
    task_two = asyncio.create_task(cpu_bound_work())

    await task_one
    await task_two
await main()
协程 main 开始执行
协程 cpu_bound_work 开始执行
协程 cpu_bound_work 用 5.2937944230000085 秒执行完毕
协程 cpu_bound_work 开始执行
协程 cpu_bound_work 用 5.311536330000081 秒执行完毕
协程 main 用 10.605689610999889 秒执行完毕

尽管创建了两个任务,代码仍然是串行执行。首先运行任务 1,然后运行任务 2,这意味着总运行时间将是对 cpu_bound_work 的两次调用的总和。

如果里面再包含 IO 密集呢?

@async_timed
async def cpu_bound_work():
    counter = 0
    for i in range(100000000):
        counter += 1
    return counter

@async_timed
async def main():
    task_one = asyncio.create_task(cpu_bound_work())
    task_two = asyncio.create_task(cpu_bound_work())
    task_three = asyncio.create_task(asyncio.sleep(4))
    await task_one
    await task_two
    await task_three
await main()
协程 main 开始执行
协程 cpu_bound_work 开始执行
协程 cpu_bound_work 用 5.4167867360001765 秒执行完毕
协程 cpu_bound_work 开始执行
协程 cpu_bound_work 用 5.475029692999897 秒执行完毕
协程 main 用 14.89641930699986 秒执行完毕

可以看到 task_three 没有并发执行,而是等到 task_one 和 task_two 执行完之后才开始执行,因为总耗时用了 7 秒多。我们说当调用 create_task 时,协程就被扔到事件循环当中运行了,但 asyncio 本质上是一个单线程,对于 CPU 密集型是不存在切换的。只有在遇见 IO(并且是非阻塞 IO)的时候,才会切换,但 cpu_bound_task 里面没有阻塞。

如果我们将任务的顺序换一下:

@async_timed
async def cpu_bound_work():
    counter = 0
    for i in range(100000000):
        counter += 1
    return counter

@async_timed
async def main():
    task_three = asyncio.create_task(asyncio.sleep(4))
    task_one = asyncio.create_task(cpu_bound_work())
    task_two = asyncio.create_task(cpu_bound_work())
    await task_one
    await task_two
    await task_three
await main()
协程 main 开始执行
协程 cpu_bound_work 开始执行
协程 cpu_bound_work 用 5.3206455939998705 秒执行完毕
协程 cpu_bound_work 开始执行
协程 cpu_bound_work 用 5.490479651999976 秒执行完毕
协程 main 用 10.811568152000063 秒执行完毕

此时总耗时就是 4 秒了,创建 task_three 的时候,依旧会将协程丢到事件循环里面运行。但由于出现了阻塞,所以会将控制权交出去,事件循环能够继续运行主协程,因此总耗时是 4 秒。

总之对于 CPU 密集型任务,如果还想放在协程里面,那么应该和进程池搭配使用,后续再聊。

运行阻塞 API#

在协程中使用阻塞 IO 密集型操作,会产生和 CPU 密集型操作相同的问题,因为这些 API 会阻塞主线程。所以在协程中运行阻塞 API 调用时,会阻塞事件循环线程本身,这意味其它的任何协程或任务都将暂停。阻塞 API 调用的示例包括使用 requests 发请求或 time.sleep 等,通常执行任何非协程的 IO 操作或执行耗时的 CPU 操作都可视为阻塞。

IO 也分两种:一种是阻塞 IO,比如 requests.get()、time.sleep() 等,这会阻塞整个线程,导致所有任务都得不到执行;另一种是非阻塞 IO,比如协程的 IO 操作,这只会阻塞协程,但线程不阻塞,线程可以执行其它已经准备就绪的任务。

import httpx

@async_timed
async def get_baidu_status():
    return httpx.get("http://www.baidu.com").status_code

@async_timed
async def main():
    task_one = asyncio.create_task(get_baidu_status())
    task_two = asyncio.create_task(get_baidu_status())
    task_three = asyncio.create_task(get_baidu_status())
    await task_one
    await task_two
    await task_three
await main()
协程 main 开始执行
协程 get_baidu_status 开始执行
协程 get_baidu_status 用 0.6948180129998036 秒执行完毕
协程 get_baidu_status 开始执行
协程 get_baidu_status 用 0.6729149400000551 秒执行完毕
协程 get_baidu_status 开始执行
协程 get_baidu_status 用 0.6849317690000589 秒执行完毕
协程 main 用 2.0530796269999882 秒执行完毕

可以看到 main() 协程的耗时,是所有任务的总和。这是因为 httpx 库是阻塞的,这意味着它将阻塞运行它的线程。由于 asyncio 只有一个线程,因此 httpx 库会阻止事件循环,此时阻塞期间,事件循环无法做其他的任何事情。

通常,你现在使用的大多数 API 都是阻塞的,且无法与 asyncio 一起使用。如果想和 asyncio 搭配,那么你需要使用支持协程、并利用非阻塞套接字的库,否则就只能进行阻塞调用了,但这样就没办法和 asyncio 一起使用了。

而对于上面这个例子,我们可以将 httpx 换成 aiohttp 或 requests,它们可以使用非阻塞套接字,并返回协程,从而获得适当的并发性。如果你只能使用同步库,并且还想和 asyncio 搭配使用的话,那么应该要引入线程池,后续再聊。

手动创建和访问事件循环#

到目前为止,我们一直使用简便的 asyncio.run 来运行应用程序,并在幕后创建事件循环。考虑到易用性,这是创建事件循环的首选方法。但某些情况下,我们可能希望执行自定义逻辑来完成与 asyncio.run 不同的任务,例如让任何剩余的任务完成而不是停止它们。

可使用 asyncio.new_event_loop() 方法创建事件循环,这将返回事件循环实例。有了这个实例,便可访问事件循环中的所有低级方法。

import asyncio

async def main():
    await asyncio.sleep(1)

loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

上面代码与我们调用 asyncio.run 时发生的情况相似,但不同之处在于不会取消任何剩余的任务,如果想要任何特殊的清理逻辑,可在 finally 子句中完成。

访问事件循环#

有时,我们也需要访问当前正在运行的事件循环,asyncio 公开了允许获取当前事件循环的 asyncio.get_running_loop 函数。例如,让我们看一下事件循环的 call_soon 方法,它将设定一个函数在事件循环的下一次迭代中运行。

import asyncio

def some_func():
    print("我将稍后被调用")

async def main():
    # 协程需要扔到事件循环里面运行,而当协程运行的时候,也可以获取所在的事件循环
    loop = asyncio.get_running_loop()
    loop.call_soon(some_func)
    await asyncio.sleep(1)

loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

loop.call_soon 接收一个函数,虽然叫 call_soon,但它接收的函数不会立即运行,而是当事件循环下一次迭代的时候运行。说白了,就是当出现 IO 进行切换的时候运行。

然后在 main() 协程里面,如果我们想获取事件循环,那么通过 get_running_loop 函数。因为 asyncio 是单线程的,因此对于一个线程来说,只会有一个事件循环。而在外部,事件循环已经创建好了,所以在驱动 main() 执行的时候,事件循环肯定是存在的,因此通过 get_running_loop 获取即可。

但如果像下面这样肯定是不行的:

import asyncio

def some_func():
    print("我将稍后被调用")

async def main():
    loop = asyncio.get_running_loop()
    loop.call_soon(some_func)
    await asyncio.sleep(1)

# 不可以在外部调用
loop = asyncio.get_running_loop()

get_running_loop 是获取当前的事件循环,显然此时事件循环还没有创建出来,所以 get_running_loop 都是放在协程里面调用的。因为协程是靠事件循环驱动的,所以当协程运行的时候,事件循环一定创建好了。

除了 get_running_loop 和 new_event_loop 之外还有 get_event_loop 和 set_event_loop,关于这几个函数的更详细区别,我们就来好好聊一聊,以及深度探讨一下事件循环。

解密事件循环#

asyncio 框架使用事件循环来编排回调函数(callback)和异步任务(asynchronous task),事件循环位于事件循环策略的上下文中。

按照 Go 语言之父的说法,协程是一种轻量级的并发模型,这是从广义上来讲的。如果从狭义上来讲,协程就是一个可以暂停、后续还能从暂停处恢复执行的函数,至于在什么地方暂停,则通过专门语法标记进行确定。而协程不能直接运行,必须由事件循环负责驱动,而事件循环在驱动协程执行之前,会先将协程包装成任务。

任务对象可以跟踪协程的状态,并由相应的事件循环进行实例化,事件循环跟踪当前正在运行的任务,并将空闲协程的 CPU 时间片委托给处于挂起(pending)状态的协程。

asyncio.get_event_loop() 做了什么:

  1. 检查在调用函数时是否有循环在运行;

  2. 如果有,则返回其 pid 与当前进程 pid 匹配的运行循环;

  3. 如果没有,获取存储在 asyncio 模块里的事件循环策略,以一个全局变量的形式存在;

  4. 如果没有设置策略(为 None),则在加锁的情况下以 DefaultEventLoopPolicy 实例化它,需要注意:DefaultEventLoopPolicy 依赖于操作系统,它提供了一个默认的循环实现,称为 get_event_loop。所以通过调用事件循环策略的 get_event_loop 方法,即可创建一个事件循环实例。

注意:事件循环策略的 get_event_loop 方法只在主线程上实例化一个循环并分配给线程局部变量,如果你不在主线程上并且没有通过其它方法实例化正在运行的循环,那么它将引发一个 RuntimeError。所以:

  1. get_event_loop 检查是否存在并返回当前运行的循环

  2. 事件循环策略是全局存储的线程,而循环实例是本地的存储线程

  3. 如果你在主线程上,那么 get_event_loop 将实例化该循环并在策略中本地保存实例线程

  4. 如果你不在主线程上,它将引发一个 RuntimeError

asyncio.get_event_loop 的逻辑很简单,就是检测当前有没有正在运行的事件循环,有就返回,没有就创建一个。而创建事件循环需要先拿到事件循环策略,策略不为空,那么直接调用它的 get_event_loop 方法;策略为空,那么就实例化 DefaultEventLoopPolicy,创建一个策略,整个过程很好理解。

而 get_running_loop 就更简单了,它表示获取当前正在运行的事件循环。

通常来说,get_running_loop 应该放在协程里面调用。因为协程要想执行,需要由事件循环驱动,所以在执行 get_running_loop 的时候,事件循环已经创建好了。

创建新的循环实例#

如果开启子线程,那么在子线程中调用 get_event_loop 是会报错的。

import asyncio
import threading

def create_loop():
    asyncio.get_event_loop()

threading.Thread(target=create_loop).start()
Exception in thread Thread-5 (create_loop):
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
self.run()
  File "/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_7932/1513571962.py", line 5, in create_loop

这是啥原因呢?首先事件循环是通过调用事件循环策略的 get_event_loop 方法创建的,事件循环策略是通过实例化 DefaultEventLoopPolicy 得到的。

不同的系统,DefaultEventLoopPolicy 对应的类不同,会根据操作系统选择一个合适的

# asyncio/unix_events.py
DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy

class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
    ...

# asyncio/events.py
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):

    _loop_factory = None

    class _Local(threading.local):
        _loop = None
        _set_called = False

    def __init__(self):
        # 注意这里的 self._local 它是线程隔离的
        self._local = self._Local()

    def get_event_loop(self):
        # 调用策略的 get_event_loop 方法创建事件循环,严格意义上讲,应该是获取事件循环
        # 从源码中可以看到创建事件循环其实是通过 new_event_loop 实现的
        # 事件循环创建完毕之后,再通过 set_event_loop 设置在策略当中
        # 而创建循环是有条件的,除了循环不存在之外,还有一个就是当前所在线程必须是主线程
        if (self._local._loop is None and
                not self._local._set_called and
                isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        
        # 如果不是主线程,那么不会创建循环,然后 self._local 又是线程隔离的
        # 因此 self._local._loop 为 None,于是调用 get_event_loop 报错
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)

        return self._local._loop

    def set_event_loop(self, loop):
        # 设置事件循环,本质上就是 self._local 的一个属性
        self._local._set_called = True
        assert loop is None or isinstance(loop, AbstractEventLoop)
        self._local._loop = loop

    def new_event_loop(self):
        # 真正用来创建事件循环,创建完了还要通过 set_event_loop 设置进去
        # 不然无法通过 get_event_loop 获取
        return self._loop_factory()

如果我们想创建事件循环,也可以通过 asyncio.new_event_loop(),或者通过 get_event_loop_policy().new_event_loop,两者是一样的。并且在刚才的源码中我们看到,get_event_loop_policy 在调用时,如果发现事件循环策略不为空,那么就不会再创建了(直接返回已存在的策略),否则才会实例化 DefaultEventLoopPolicy。这就说明,不管事件循环有多少个,但是策略只有一个,而这些循环都保存在策略的 _local 属性中。

所以策略的 _local 里面可以有很多事件循环,而通过 get_event_loop 获取事件循环,本质上就是通过 策略._local._loop 的方式获取。而不同的线程会获取不同的 _loop,因为 _local 里面保存了线程 ID 到事件循环的映射,会根据线程 ID 获取对应的事件循环。注意 set_event_loop 的原理也很简单,就是将 new_event_loop 创建好的事件循环赋值给 策略._local._loop

import asyncio
import threading

def create_loop():
    # 获取事件循环策略,如果没创建,那么就实例化 DefaultEventLoopPolicy 创建一个
    # 这个 DefaultEventLoopPolicy 也不是一个具体的类,而是根据操作系统会对应不同的类
    loop_policy = asyncio.get_event_loop_policy()
    # 通过策略的 new_event_loop 方法创建事件循环
    loop = loop_policy.new_event_loop()
    # 但以上两步可以直接合成一步,通过 asyncio.new_event_loop

    # 设置循环,将循环设置在策略的 _local 中,这样才能通过 get_event_loop 获取
    asyncio.set_event_loop(loop)
    loop.close()


threading.Thread(target=create_loop).start()
threading.Thread(target=create_loop).start()
threading.Thread(target=create_loop).start()

以上我们就创建了 3 个事件循环,并保存在了策略的 _local 属性下面。

总结:事件循环策略在整个进程内是单例的,所有的线程共享一个策略;事件循环在所在的线程内是单例的,一个线程内部只会有一个事件循环。所有线程对应的循环均位于策略的 _local 属性中,获取的时候根据线程 ID 区分。

  • 策略的 new_event_loop 方法:创建事件循环;

  • 策略的 set_event_loop 方法:设置事件循环;

  • 策略的 get_event_loop 方法:获取事件循环,会先检测策略的 _local 中是否有当前线程对应的事件循环,有则获取,没有则通过 new_event_loop 创建、set_event_loop 设置,然后返回;

但是 get_event_loop、set_event_loop、new_event_loop 我们一般不会手动通过策略去调用,而是会通过 asyncio 去调用,比如 asyncio.get_event_loop。当然在 asyncio.get_event_loop 内部,也是先通过 get_event_loop_policy() 获取策略,然后调用策略的 get_event_loop 方法,获取线程对应的循环,两者本质是一样的,因为策略是单例的。

所以无论主线程还是子线程,毫无疑问都是可以创建事件循环的。只不过主线程既可以手动调用 new_event_loop 和 set_event_loop 来创建,也可以调用 get_event_loop(当循环不存在时内部自动创建)。但对于子线程而言,只能采用第一种方式,也就是手动创建,如果直接调用 get_event_loop 是会报错的,至于原因,源码中写的很清楚了。

当循环不存在时,必须是主线程才会自动创建,而子线程不会。所以结果就是因为循环为空,导致程序报错。

最佳实践:对于主线程,在外部我们会调用 get_event_loop,在协程内部我们会调用 get_running_loop;如果是子线程,那么在外部则需要 new_event_loop + set_event_loop 来实现。

import asyncio
from asyncio import get_event_loop_policy

# 创建事件循环
loop = asyncio.new_event_loop()
# 设置在策略的 _local 属性中
# 调用 asyncio.get_event_loop 时,会直接返回
# 因为循环存在,就不会再创建了
asyncio.set_event_loop(loop)

print(
    asyncio.get_event_loop() is loop is get_event_loop_policy()._local._loop
)  # True

运行事件循环#

回调函数和协程每次只能在预先设计好、并正在运行的事件循环上被调度。我们需要知道究竟该调用哪个循环 API,以便将事件循环状态机(state machine)转换为运行状态。我们还需要确定正确的位置来调度回调函数和协程。

import asyncio

async def main():
    print("Hello World")

# 获取事件循环直接通过 get_event_loop 即可
# 在没有的时候会自动创建
loop = asyncio.get_event_loop()
# 注:asyncio.create_task 只能在协程里面用
# 而 loop.create_task 在任何地方都可以
# 当然它们返回的都是 Task 对象
loop.create_task(main())
# 注意:此时事件循环虽然创建了,但是还没有运行
# 我们随便驱动一个协程,这样事件循环就运行起来了
# 然后会检测事件循环里面的任务,并驱动它们执行
loop.run_until_complete(asyncio.sleep(1))
"""
Hello World
"""

或者我们也可以显式地启动事件循环:

import asyncio

async def main():
    print("Hello World")

loop = asyncio.get_event_loop()
loop.create_task(main())
try:
    loop.run_forever()
finally:
    loop.close()
"""
Hello World
"""

任务可以先添加到事件循环中,然后调用 loop.run_forever() 启动事件循环,这样之前添加的任务会自动执行。并且这个 run_forever() 将处于阻塞状态,直到我们显式调用 loop.stop() 或出现异常时才会停止。

除了 loop.stop() 之外还有 loop.close(),loop.stop() 之后仍然可以调用 loop.run_* 方法,但 loop.close() 不行,它会直接关闭事件循环。

除了 loop.run_forever,也可以通过 loop.run_until_complete 调度协程来启动事件循环,就像上面的代码那样。并且这么做有一个好处,就是我们不必显式调用 loop.stop(),循环会一直运行直到传递给 run_until_complete 的协程执行结束后停止。

import asyncio

async def main():
    await asyncio.sleep(3)
    print("Hello World")

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_until_complete(asyncio.sleep(1))

此时不会有任何输出,因为当 asyncio.sleep(1) 这个协程结束后,事件循环就直接停止了。

查看事件循环中没有运行完的任务#

任务要被添加到事件循环里面,但如果任务还没有运行完,事件循环就结束了该怎么办?就像上面那样,如何才能查看那些没有运行完的任务呢?

import asyncio

async def main1():
    await asyncio.sleep(1)
    print("我是 main1")

async def main2():
    await asyncio.sleep(2)
    print("我是 main2")

async def main3():
    await asyncio.sleep(3)
    print("我是 main3")

loop = asyncio.get_event_loop()
# 启动三个任务,并丢到事件循环中
# 但事件循环还没有启动,所以任务也不会执行
loop.create_task(main1(), name="main1")  # 创建任务时可以给任务起个名字
loop.create_task(main2(), name="main2")
loop.create_task(main3(), name="main3")

# 当调用 loop.for_ever 时,会启动事件循环,无限运行
# 直到我们调用 loop.stop 或 loop.close 时停止
# 当然也可以通过 loop.run_until_complete 运行一个协程,来启动事件循环
# 但这种方式启动的事件循环,会在 run_until_complete 里面的任务(协程)执行完毕后自动停止
loop.run_until_complete(asyncio.sleep(1.5))
"""
我是 main1
"""

# 所以此时 main1() 一定运行完了,但 main2() 和 main3() 显然没有
# 通过 asyncio.all_tasks(loop) 可以查看当前尚未运行完毕的所有任务
unfinished_tasks = asyncio.all_tasks(loop)
print(unfinished_tasks)
"""
{<Task pending name='main2' coro=<main2() running at .../main.py:8> wait_for=<Future pending..., 
 <Task pending name='main3' coro=<main3() running at .../main.py:12> wait_for=<Future pending...}
"""
# 返回一个集合,显然里面就是 main2 和 main3 两个没有完成的任务
t1 = unfinished_tasks.pop()
t2 = unfinished_tasks.pop()
print(t1.get_name(), t2.get_name())  
"""
main2 main3
"""

# 继续让它完成
loop.run_until_complete(t1)
"""
我是 main2
"""

async def contiune_run():
    await t2

loop.run_until_complete(contiune_run())
"""
我是 main3
"""

个人觉得还是很有趣的,Task 是 Future 的子类,所以我们也可以调用任务的 add_done_callback 方法绑定一个回调,当任务执行完毕时自动触发回调。

asyncio.run() 源码解析#

如果你的设置非常简单,只想运行一个协程直到它完成,那么你可以使用 asyncio.run 这个 API。这个 API 之前一直在用,那么它是怎么实现的呢?

def run(main, *, debug=None):
    
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))
    # 不管当前是否存在事件循环,都会创建一个新的事件循环
    loop = events.new_event_loop()
    try:
        # 并把之前的事件循环替换掉,因为一个线程只会有一个事件循环
        events.set_event_loop(loop)
        if debug is not None:
            loop.set_debug(debug)
        # 运行指定的协程
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            # 将所有的异步生成器给清理掉
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            # 将事件循环替换为 None
            events.set_event_loop(None)
            # 关闭事件循环(不是停止、是关闭)
            loop.close()

所以这里面存在一个问题,就是使用 asyncio.run 之后,就不能再调用 get_event_loop 了。

import asyncio

async def main():
    pass

asyncio.run(main())
loop = asyncio.get_event_loop()
"""
RuntimeError: There is no current event loop in thread 'MainThread'.
"""

策略的 _local 属性里面除了有表示事件循环的 _loop 之外,还有一个 _set_called,它表示该线程是否设置过事件循环。当调用 asyncio.run 的时候,将该字段设置成了 True,然后执行完毕把事件循环设置成 None 了,但 _set_called 却没有设置成 False。因此当我们再调用 get_event_loop 的时候,第一个 if 不满足,于是不会再创建事件循环了;但事件循环又已经被设置为 None 了,于是第二个 if 条件满足,程序报错。

所以总结:asyncio.run 只适合一次性的简单任务。

asyncio.run 本身是可以调用多次的,因为它每次都会创建新的循环。