流程调度#
使用 Prefect 调度并运行流程。
在本教程中,您将学习如何将该流程从本地机器上移出,并使用 Prefect 调度运行。
from pathlib import Path
temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
temp_dir.mkdir(exist_ok=True)
发布代码到远程仓库#
首先,你需要将本地机器上的代码推送到远程仓库。比如:
https://github.com/prefecthq/demos.git
创建工作池#
在本地运行流程是很好的开始,但大多数使用案例都需要远程执行环境。工作池是部署流程到远程基础设施最常见的接口。
将你的流程部署到自托管的Prefect服务器实例上,使用进程工作池(Process work pool)。所有提交给这个工作池的流程运行都将在本地子进程中进行(对于在远程基础设施上运行的其他类型的工作池,其创建机制类似)。
创建
Process
工作池:
prefect work-pool create --type process my-work-pool
请确认工作池是否存在
prefect work-pool ls
启动 worker 以轮询工作池:
prefect worker start --pool my-work-pool
使用托管工作池将您的流程部署到 Prefect Cloud。
创建受管理的工作池:
prefect work-pool create my-work-pool --type prefect:managed
请在用户界面的工作池页面上查看您的新工作池。
备注
也可以从其他工作池类型中进行选择。
部署并调度流程#
部署用于确定流程何时、在何地以及如何运行。通过部署,流程被提升为具有自己 API 的远程可配置实体。要设置流程按照预定计划运行,您需要创建部署。
创建部署代码:
%%file {temp_dir}/create_deployment.py
from prefect import flow
# Source for the code to deploy (here, a GitHub repo)
SOURCE_REPO="https://github.com/prefecthq/demos.git"
if __name__ == "__main__":
flow.from_source(
source=SOURCE_REPO,
entrypoint="my_gh_workflow.py:repo_info", # Specific flow to run
).deploy(
name="my-first-deployment",
work_pool_name="my-work-pool", # Work pool target
cron="* * * * *", # Cron schedule (every minute)
)
Writing .temp/create_deployment.py
小技巧
您可以将流程代码存储在几乎任何 Prefect 能够访问的位置。有关更多详细信息,请查看“流程代码存储位置”部分。
执行脚本以创建部署环境:
python create_deployment.py
请检查日志以确保您的部署已成功创建
Successfully created/updated all deployments!
______________________________________________________
| Deployments |
______________________________________________________
| Name | Status | Details |
______________________________________________________
| repo-info/my-first-deployment | applied | |
______________________________________________________
调度部署的运行
prefect deployment run 'repo-info/my-first-deployment'
很快能在用户界面的 Flow Run
页面上看到流程运行图和日志。同时,日志也会实时传输到终端。