AsyncFileSystem
简介#
fsspec
支持在某些实现上进行异步操作。这允许在批量操作(如同时获取多个文件的内容)中进行并发调用,即使是普通代码也可以这样做,并且可以在不阻塞的情况下直接在异步代码中使用 fsspec
。异步实现派生自 fsspec.asyn.AsyncFileSystem
类。可以使用类属性 async_impl
来测试一个实现是否为异步的。
AsyncFileSystem
包含了 AbstractFileSystem
中方法的 async def
协程版本。按照约定,这些方法都以 _
为前缀,表示它们不应在普通代码中直接调用,只有在你知道自己在做什么时候才调用。在大多数情况下,代码是相同的或稍作修改,将同步调用替换为对异步函数的 await
调用。
fsspec
内置的唯一异步实现是 HTTPFileSystem
。
同步 API#
AbstractFileSystem
的方法在普通代码中可用并可被调用。它们会调用并等待相应的异步函数。工作在单独的线程中进行,所以如果同时有许多 fsspec 操作在多个线程中启动,它们仍然都会在同一个专用于 IO 的线程上进行处理。
大多数用户不应该意识到他们的代码是异步运行的。
请注意,同步函数使用 sync_wrapper
包装,它会复制 AbstractFileSystem
中的文档字符串,除非在实现中明确给出。
示例:
fs = fsspec.filesystem("http")
out = fs.cat([url1, url2, url3]) # fetches data concurrently
协程批处理#
各种创建许多协程并将其传递给事件循环进行处理的方法可以进行批处理:一次性提交一定数量的协程,并等待它们完成之后再启动更多。这对于解决本地打开文件限制(可能小于100)和避免堆溢出很重要。
fsspec.asyn._run_coros_in_chunks()
控制这个过程,但从用户的角度来看,有三种方式可以影响它。按优先级递增的顺序:
全局变量
fsspec.asyn._DEFAULT_BATCH_SIZE
和fsspec.asyn._NOFILES_DEFAULT_BATCH_SIZE
(分别用于涉及本地文件或不涉及的调用)配置键
"gather_batch_size"
和"nofiles_gather_batch_size"
异步文件系统的批处理方法接受的
batch_size
关键字。
在 Async 中使用#
可以使用 asynchronous=True
创建文件系统实例。这意味着实例化发生在协程内部,因此可以使用 await
直接调用各种 async
方法,就像在异步代码中一样。
请注意,由于 __init__
是一个阻塞函数,任何创建异步资源的操作都将被推迟。通常需要显式地等待一个协程来创建它们。由于垃圾回收也发生在阻塞代码中,你可能也希望显式地等待资源析构器。示例:
async def work_coroutine():
fs = fsspec.filesystem("http", asynchronous=True)
session = await fs.set_session() # creates client
out = await fs._cat([url1, url2, url3]) # fetches data concurrently
await session.close() # explicit destructor
asyncio.run(work_coroutine())
自带事件循环#
对于非异步情况,fsspec
通常会在特定线程上创建一个 asyncio
事件循环。然而,调用应用程序可能更希望 IO 进程运行在一个已经存在并正在运行的事件循环上(在另一个线程中)。这个循环需要符合 asyncio
规范,但不一定是一个 asyncio.events.AbstractEventLoop
。示例:
loop = ... # however a loop was made, running on another thread
fs = fsspec.filesystem("http", loop=loop)
out = fs.cat([url1, url2, url3]) # fetches data concurrently
实现新的后端#
异步文件系统应该派生自 AsyncFileSystem
,并实现其中的 async def _*
协程。这些函数将自动生成同步版本(如果名称在 async_methods
列表中),或者可以使用 sync_wrapper
直接创建。
class MyFileSystem(AsyncFileSystem):
async def _my_method(self):
...
my_method = sync_wrapper(_my_method)
这些函数不能调用本身是同步的方法或函数,而应该 await
其他协程。调用不需要同步的方法是可以的,例如 _strip_protocol
。
请注意,__init__
不能是异步的,所以可能需要使用同步函数来分配异步资源,但只有在 asynchronous=False
的情况下。如果是 True
,你可能需要要求调用者等待一个创建这些资源的协程。同样,任何析构函数(例如 __del__
)都将在常规代码中运行,可能在循环已停止/关闭之后。
要调用同步,你需要传递关联的事件循环,它将作为属性 .loop
可用。