在后台运行任务#

查看使用Prefect任务和任务工作者的示例。

Prefect任务帮助您快速执行小型、独立的工作单元。延迟的Prefect任务使用Prefect任务工作者在后台进程中运行。使用延迟任务可以将工作从应用程序前台移出,并在多个进程或机器上分布并发执行。

例如,如果您有一个Web应用程序,延迟任务允许您卸载诸如发送电子邮件、处理图像或将数据插入数据库之类的流程。

使用延迟任务#

Prefect任务是Python函数,可以立即运行或延迟到后台执行。

通过向Python函数添加 @task 装饰器来定义任务,并使用 delay 方法在后台运行该任务。

如果您安排任务在后台执行,您可以在单独的进程或容器中运行任务工作者来执行任务。这个过程类似于Celery工作者或 arq工作者。

定义任务#

在Python函数上添加@task装饰器以定义 Prefect 任务。

from prefect import task

@task
def my_background_task(name: str):
    # Task logic here
    print(f"Hello, {name}!")

调用任务#

你可以直接调用任务来立即运行它,或者你可以使用Task.delay将任务推迟到后台执行。

备注

你可以在工作流程中向诸如Ray或Dask这样的_任务执行器_提交任务,在Prefect中这被称为 flow。然而,本指南专注于在工作流之外延迟任务执行。例如,在Web应用程序内通过调用my_task.delay()

无论你如何运行任务,Prefect都会利用你的任务配置来管理和控制任务的执行。 以下示例展示了调用任务和使用delay的两种方法:

# Import the previously-defined task
from myproject.tasks import my_background_task

# Run the task immediately
my_background_task("Joaquim")

# Schedule the task for execution outside of this process
my_background_task.delay("Agrajag")

使用任务工作者执行延迟任务#

要在单独的进程或容器中运行任务,请启动一个任务工作者。

任务工作者持续从Prefect的API接收执行延迟任务的指令,执行这些任务,并将结果报告回API。

备注

任务工作者仅运行延迟任务,不运行你作为常规Python函数直接调用的任务。

通过将任务传递给prefect.task_worker.serve()方法来运行任务工作者:

from prefect import task
from prefect.task_worker import serve


@task
def my_background_task(name: str):
    # Task logic here
    print(f"Hello, {name}!")


if __name__ == "__main__":
    # NOTE: The serve() function accepts multiple tasks. The Task worker 
    # will listen for scheduled task runs for all tasks passed in.
    serve(my_background_task)

任务工作者开始监听预定的任务。如果任务在任务工作者启动之前已经排定, 它将开始处理这些任务。

您也可以使用辅助CLI命令 prefect task serve 来启动任务工作者:

prefect task serve my_task.py:my_background_task

探索Prefect中的延迟任务和任务工作者#

以下是使用Prefect的延迟任务和任务工作者的一些示例。

您将:

  • 通过调用在前台运行 Prefect 任务

  • 启动任务工作者并延迟任务,使它们在后台运行

  • 创建基本的 FastAPI 应用程序,当您访问端点时,该应用程序会延迟任务

  • 在两个模仿真实用例的例子中使用 Docker

一个例子使用带有多个微服务的 FastAPI 服务器,模拟新用户注册工作流程。 另一个例子使用 Flask 服务器和 Marvin 从 CLI 向 LLM 提问并获取答案。

设置#

第一步:激活虚拟环境#

本例使用了conda,但任何虚拟环境管理器均可适用。

conda deactivate
conda create -n python-tasks python=3.12
conda activate python-tasks

第二步:安装Python依赖项#

pip install -U prefect marvin fastapi==0.107

第三步:连接到Prefect云或自托管的Prefect服务器#

这些示例可以使用Prefect云或自托管的Prefect服务器。

你必须设置PREFECT_API_URL以将任务发送给任务工作者。

如果你使用的是带有SQLite支持数据库(默认数据库)的Prefect服务器,请使用以下命令将此值保存到你的活动Prefect配置文件中:

prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

如果使用Prefect云,将PREFECT_API_URL设置为Prefect云API URL,并添加你的API密钥

使用docker的示例(示例4和5)默认使用Prefect服务器。你可以通过更改PREFECT_API_URL并在docker-compose.yaml中为你的API键添加一个变量来切换到Prefect云。或者,通过设置PREFECT_API_DATABASE_CONNECTION_URL使用由PostgreSQL数据库支持的Prefect服务器。

如果选择使用Prefect服务器而不是Prefect云,请运行以下命令启动你的服务器:

prefect server start 

步骤4:克隆仓库(可选)#

克隆仓库以获取示例的代码文件:

git clone https://github.com/PrefectHQ/prefect-background-task-examples.git

进入目录:

cd prefect-background-task-examples

示例1:通过调用在前台运行Prefect任务#

@task装饰器添加到任何Python函数上以定义 Prefect 任务。

步骤1:创建带有任务装饰的函数的文件#

创建文件,并将以下代码保存在其中,或者运行basic-examples目录中现有的文件。

from prefect import task 

@task(log_prints=True)
def greet(name: str = "Marvin"):
    print(f"Hello, {name}!")

if __name__ == "__main__":
    greet()

第二步:在终端运行脚本#

python greeter.py

你应该能在终端看到任务正在运行。这个任务是在前台执行的,意味着它不会延迟执行。

可选操作#

你可以在用户界面中查看任务运行情况。 如果你使用的是自托管的Prefect服务器实例,你也可以在数据库中查看任务运行记录。

如果你想检查SQLite数据库,请使用你喜欢的界面。 DB Browser for SQLite 的使用方法如下所述。

如需下载,请点击这里。安装并打开它。

点击“连接”。然后导航到你的SQLite数据库文件。默认情况下,它位于~/.prefect目录中。

进入task_run表以查看所有任务运行情况。 向下滚动以查看你最近的一次任务运行或对其进行筛选。

如有需要,点击刷新按钮以获取更新。

from pathlib import Path

temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
    temp_dir.mkdir(exist_ok=True)

示例2:启动任务工作者并在后台运行延迟任务#

要在单独的进程或容器中运行任务,启动一个任务工作者,类似于运行Celery工作者或arq工作者的方式。

任务工作者不断地从Prefect的API接收要执行的定时任务,执行它们,并将结果报告回API。

通过将任务传递给prefect.task_worker.serve()方法来运行任务工作者。

步骤1:在文件中定义任务和任务工作者#

%%file {temp_dir}/task_worker.py
from prefect import task
from prefect.task_worker import serve

@task
def my_background_task(name: str):
    print(f"Hello, {name}!")

if __name__ == "__main__":
    serve(my_background_task)
Writing .temp/task_worker.py

步骤2:通过在终端运行脚本启动任务工作者#

python task_worker.py

任务工作者正在等待执行 my_background_task 任务。

步骤3:创建文件并保存以下代码:#

%%file {temp_dir}/task_scheduler.py
from task_worker import my_background_task

if __name__ == "__main__":
    my_background_task.delay("Agrajag")
Writing .temp/task_scheduler.py

步骤4:打开另一个终端并运行脚本#

python task_scheduler.py

该代码从 delay 方法返回 “future” 对象。你可以使用这个对象来等待任务完成,调用 wait() 方法,并通过 result() 方法检索其结果。你也可以查看任务运行的 UUID 和其他关于任务运行的信息。

步骤5:在 UI 中查看任务运行#

使用任务运行的UUID在UI中查看任务运行。URL将如下所示:

http://127.0.0.1:4200/task-runs/task-run/my_task_run_uuid_goes_here

请将你的 UUID 替换到 URL 的末尾。

步骤6:使用多个任务工作者并行运行任务#

启动任务工作者的另一个实例。在另一个终端运行:

python task_worker.py

步骤7:向任务工作者发送多个任务#

修改 task_scheduler.py 文件以向任务工作者发送具有不同输入的多个任务:

from task_worker import my_background_task

if __name__ == "__main__":
    my_background_task.delay("Ford")
    my_background_task.delay("Prefect")
    my_background_task.delay("Slartibartfast")

运行文件以查看工作如何在两个任务工作者之间分配。

步骤8:使用 control + c 关闭任务工作者#

本指南向您展示了如何将任务发送到多个在后台运行的Prefect任务工作者。 这使您能够通过WebSockets观察这些任务并行且非常快速地执行,无需轮询。

请参阅延迟任务GitHub仓库中的其他示例。