在后台运行任务#
查看使用Prefect任务和任务工作者的示例。
Prefect任务帮助您快速执行小型、独立的工作单元。延迟的Prefect任务使用Prefect任务工作者在后台进程中运行。使用延迟任务可以将工作从应用程序前台移出,并在多个进程或机器上分布并发执行。
例如,如果您有一个Web应用程序,延迟任务允许您卸载诸如发送电子邮件、处理图像或将数据插入数据库之类的流程。
使用延迟任务#
Prefect任务是Python函数,可以立即运行或延迟到后台执行。
通过向Python函数添加 @task
装饰器来定义任务,并使用 delay
方法在后台运行该任务。
如果您安排任务在后台执行,您可以在单独的进程或容器中运行任务工作者来执行任务。这个过程类似于Celery工作者或 arq工作者。
定义任务#
在Python函数上添加@task
装饰器以定义 Prefect 任务。
from prefect import task
@task
def my_background_task(name: str):
# Task logic here
print(f"Hello, {name}!")
调用任务#
你可以直接调用任务来立即运行它,或者你可以使用Task.delay
将任务推迟到后台执行。
备注
你可以在工作流程中向诸如Ray或Dask这样的_任务执行器_提交任务,在Prefect中这被称为 flow。然而,本指南专注于在工作流之外延迟任务执行。例如,在Web应用程序内通过调用my_task.delay()
。
无论你如何运行任务,Prefect都会利用你的任务配置来管理和控制任务的执行。
以下示例展示了调用任务和使用delay
的两种方法:
# Import the previously-defined task
from myproject.tasks import my_background_task
# Run the task immediately
my_background_task("Joaquim")
# Schedule the task for execution outside of this process
my_background_task.delay("Agrajag")
使用任务工作者执行延迟任务#
要在单独的进程或容器中运行任务,请启动一个任务工作者。
任务工作者持续从Prefect的API接收执行延迟任务的指令,执行这些任务,并将结果报告回API。
备注
任务工作者仅运行延迟任务,不运行你作为常规Python函数直接调用的任务。
通过将任务传递给prefect.task_worker.serve()
方法来运行任务工作者:
from prefect import task
from prefect.task_worker import serve
@task
def my_background_task(name: str):
# Task logic here
print(f"Hello, {name}!")
if __name__ == "__main__":
# NOTE: The serve() function accepts multiple tasks. The Task worker
# will listen for scheduled task runs for all tasks passed in.
serve(my_background_task)
任务工作者开始监听预定的任务。如果任务在任务工作者启动之前已经排定, 它将开始处理这些任务。
您也可以使用辅助CLI命令 prefect task serve
来启动任务工作者:
prefect task serve my_task.py:my_background_task
探索Prefect中的延迟任务和任务工作者#
以下是使用Prefect的延迟任务和任务工作者的一些示例。
您将:
通过调用在前台运行 Prefect 任务
启动任务工作者并延迟任务,使它们在后台运行
创建基本的 FastAPI 应用程序,当您访问端点时,该应用程序会延迟任务
在两个模仿真实用例的例子中使用 Docker
一个例子使用带有多个微服务的 FastAPI 服务器,模拟新用户注册工作流程。 另一个例子使用 Flask 服务器和 Marvin 从 CLI 向 LLM 提问并获取答案。
设置#
第一步:激活虚拟环境#
本例使用了conda,但任何虚拟环境管理器均可适用。
conda deactivate
conda create -n python-tasks python=3.12
conda activate python-tasks
第二步:安装Python依赖项#
pip install -U prefect marvin fastapi==0.107
第三步:连接到Prefect云或自托管的Prefect服务器#
这些示例可以使用Prefect云或自托管的Prefect服务器。
你必须设置PREFECT_API_URL
以将任务发送给任务工作者。
如果你使用的是带有SQLite支持数据库(默认数据库)的Prefect服务器,请使用以下命令将此值保存到你的活动Prefect配置文件中:
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
如果使用Prefect云,将PREFECT_API_URL
设置为Prefect云API URL,并添加你的API密钥。
使用docker的示例(示例4和5)默认使用Prefect服务器。你可以通过更改PREFECT_API_URL
并在docker-compose.yaml
中为你的API键添加一个变量来切换到Prefect云。或者,通过设置PREFECT_API_DATABASE_CONNECTION_URL
使用由PostgreSQL数据库支持的Prefect服务器。
如果选择使用Prefect服务器而不是Prefect云,请运行以下命令启动你的服务器:
prefect server start
步骤4:克隆仓库(可选)#
克隆仓库以获取示例的代码文件:
git clone https://github.com/PrefectHQ/prefect-background-task-examples.git
进入目录:
cd prefect-background-task-examples
示例1:通过调用在前台运行Prefect任务#
将@task
装饰器添加到任何Python函数上以定义 Prefect 任务。
步骤1:创建带有任务装饰的函数的文件#
创建文件,并将以下代码保存在其中,或者运行basic-examples目录中现有的文件。
from prefect import task
@task(log_prints=True)
def greet(name: str = "Marvin"):
print(f"Hello, {name}!")
if __name__ == "__main__":
greet()
第二步:在终端运行脚本#
python greeter.py
你应该能在终端看到任务正在运行。这个任务是在前台执行的,意味着它不会延迟执行。
可选操作#
你可以在用户界面中查看任务运行情况。 如果你使用的是自托管的Prefect服务器实例,你也可以在数据库中查看任务运行记录。
如果你想检查SQLite数据库,请使用你喜欢的界面。 DB Browser for SQLite 的使用方法如下所述。
如需下载,请点击这里。安装并打开它。
点击“连接”。然后导航到你的SQLite数据库文件。默认情况下,它位于~/.prefect
目录中。
进入task_run
表以查看所有任务运行情况。
向下滚动以查看你最近的一次任务运行或对其进行筛选。
如有需要,点击刷新按钮以获取更新。
from pathlib import Path
temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
temp_dir.mkdir(exist_ok=True)
示例2:启动任务工作者并在后台运行延迟任务#
要在单独的进程或容器中运行任务,启动一个任务工作者,类似于运行Celery工作者或arq工作者的方式。
任务工作者不断地从Prefect的API接收要执行的定时任务,执行它们,并将结果报告回API。
通过将任务传递给prefect.task_worker.serve()
方法来运行任务工作者。
步骤1:在文件中定义任务和任务工作者#
%%file {temp_dir}/task_worker.py
from prefect import task
from prefect.task_worker import serve
@task
def my_background_task(name: str):
print(f"Hello, {name}!")
if __name__ == "__main__":
serve(my_background_task)
Writing .temp/task_worker.py
步骤2:通过在终端运行脚本启动任务工作者#
python task_worker.py
任务工作者正在等待执行 my_background_task
任务。
步骤3:创建文件并保存以下代码:#
%%file {temp_dir}/task_scheduler.py
from task_worker import my_background_task
if __name__ == "__main__":
my_background_task.delay("Agrajag")
Writing .temp/task_scheduler.py
步骤4:打开另一个终端并运行脚本#
python task_scheduler.py
该代码从 delay
方法返回 “future” 对象。你可以使用这个对象来等待任务完成,调用 wait()
方法,并通过 result()
方法检索其结果。你也可以查看任务运行的 UUID 和其他关于任务运行的信息。
步骤5:在 UI 中查看任务运行#
使用任务运行的UUID在UI中查看任务运行。URL将如下所示:
http://127.0.0.1:4200/task-runs/task-run/my_task_run_uuid_goes_here
请将你的 UUID 替换到 URL 的末尾。
步骤6:使用多个任务工作者并行运行任务#
启动任务工作者的另一个实例。在另一个终端运行:
python task_worker.py
步骤7:向任务工作者发送多个任务#
修改 task_scheduler.py
文件以向任务工作者发送具有不同输入的多个任务:
from task_worker import my_background_task
if __name__ == "__main__":
my_background_task.delay("Ford")
my_background_task.delay("Prefect")
my_background_task.delay("Slartibartfast")
运行文件以查看工作如何在两个任务工作者之间分配。
步骤8:使用 control + c 关闭任务工作者#
本指南向您展示了如何将任务发送到多个在后台运行的Prefect任务工作者。 这使您能够通过WebSockets观察这些任务并行且非常快速地执行,无需轮询。
请参阅延迟任务GitHub仓库中的其他示例。