coroutine

coroutine#

# from _collections_abc import coroutine
from abc import ABCMeta, abstractmethod
from _collections_abc import _check_methods, GenericAlias, Awaitable, Coroutine

## coroutine ##
async def _coro(): pass
_coro = _coro()
coroutine = type(_coro) # 协程
_coro.close()  # Prevent ResourceWarning
del _coro
class Coroutine(Awaitable):

    __slots__ = ()

    @abstractmethod
    def send(self, value):
        """将 `value` 发送到协程中。
        返回下一个生成的值,或者引发 StopIteration 异常。
        """
        raise StopIteration

    @abstractmethod
    def throw(self, typ, val=None, tb=None):
        """
        在协程中引发异常。
        返回下一个生成的值或引发 StopIteration 异常。
        """
        if val is None:
            if tb is None:
                raise typ
            val = typ()
        if tb is not None:
            val = val.with_traceback(tb)
        raise val

    def close(self):
        """在协程内引发 GeneratorExit 异常。
        """
        try:
            self.throw(GeneratorExit)
        except (GeneratorExit, StopIteration):
            pass
        else:
            raise RuntimeError("coroutine ignored GeneratorExit")

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Coroutine:
            return _check_methods(C, '__await__', 'send', 'throw', 'close')
        return NotImplemented


Coroutine.register(coroutine) # 将 coroutine 函数注册为 Coroutine 类的虚拟子类
coroutine