在 asyncio 中调用命令行程序#
Python 编写的应用程序需要 Python 运行环境,然而,并不是所有想要交互的组件都是用 Python 编写的。可能已经使用了 C++ 、Go、Rust 或其他语言编写的应用程序,这些语言提供了更好的运行时特征,或者可以提供很优秀的实现方式,而无需重新实现。可能还希望使用操作系统提供的命令行实用工具,例如 grep 用于搜索大型文件,curl 用于发出 HTTP 请求等。
import asyncio
from asyncio.subprocess import Process
async def main():
process: Process = await asyncio.create_subprocess_exec("ls", "-l")
print(f"进程的 pid: {process.pid}")
# 等待子进程执行完毕,并返回状态码
status_code = await process.wait()
print(f"status code: {status_code}")
await main()
进程的 pid: 1544250
total 12
-rw-rw-r-- 1 ai ai 1831 2月 24 14:47 asyncio-subprocess.ipynb
-rw-rw-r-- 1 ai ai 53 2月 24 14:43 index.md
-rw-rw-r-- 1 ai ai 2631 2月 24 14:38 switch.ipynb
status code: 0
控制标准输出:
import asyncio
from asyncio.subprocess import Process, PIPE
from asyncio.streams import StreamReader
async def main():
process: Process = await asyncio.create_subprocess_exec("ls", "-la", stdout=PIPE)
print(f"进程的 pid: {process.pid}")
await process.wait()
# 当子进程执行完毕时,拿到它的 stdout 属性
stdout: StreamReader = process.stdout
# 读取输出内容,如果子进程没有执行完毕,那么 await stdout.read() 会阻塞
content = (await stdout.read()).decode("utf-8")
print(content[: 100])
await main()
进程的 pid: 1544747
total 12
drwxrwxr-x 2 ai ai 90 2月 24 14:43 .
drwxrwxrwx 6 ai ai 100 2月 24 13:39 ..
-rw-rw-r-- 1
与进程交互#
from pathlib import Path
temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
temp_dir.mkdir(exist_ok=True)
%%file {temp_dir}/write_data.py
import sys
[sys.stdout.buffer.write(b'!Hello there!!\n') for _ in range(1000000)]
sys.stdout.flush()
Writing .temp/write_data.py
from asyncio.subprocess import Process, PIPE
async def main():
process: Process = await asyncio.create_subprocess_exec(
"python", f"{temp_dir}/write_data.py", stdout=PIPE, stderr=PIPE)
print(f"进程的 pid: {process.pid}")
# 同样会阻塞,直到进程完成
stdout, stderr = await process.communicate()
print(f"状态码: {process.returncode}")
await main()
进程的 pid: 1549378
状态码: 0