快速上手 Prefect#

使用 Prefect,以最简单的方式编排和监控您的数据管道。

from pathlib import Path

temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
    temp_dir.mkdir(exist_ok=True)

Prefect 是编排和可观测性平台,它赋予开发者快速构建和扩展工作流的能力。在这个快速开始指南中,您将使用 Prefect 将以下 Python 脚本转换为可部署的工作流。

%%file {temp_dir}/my_script.py
import httpx


def show_stars(github_repos: list[str]):
    """Show the number of stars that GitHub repos have"""

    for repo in github_repos:
        repo_stats = fetch_stats(repo)
        stars = get_stars(repo_stats)
        print(f"{repo}: {stars} stars")


def fetch_stats(github_repo: str):
    """Fetch the statistics for a GitHub repo"""

    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()


def get_stars(repo_stats: dict):
    """Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']


if __name__ == "__main__":
    show_stars([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])
Writing .temp/my_script.py

连接至Prefect API#

连接到 Prefect API:

  1. 启动本地 API 服务器

prefect server start
  1. 浏览器中打开位于 http://localhost:4200 的 Perfect 管理面板。

  1. 请访问 https://app.prefect.cloud/ 并登录或创建免费的 Prefect Cloud 账户。

  2. 从开发环境中登录到 Prefect Cloud:

prefect cloud login

请使用网络浏览器登录,并在打开的浏览器窗口中点击 Authorize 按钮。

您的命令行接口现在通过本地存储的 API 密钥与 Prefect 云账户进行身份验证,该密钥有效期为30天。

如果您在使用基于浏览器的身份验证时遇到任何问题,您可以通过手动创建的 API 密钥来进行身份验证

将您的脚本转换为 Prefect 工作流#

装饰器是将 Python 脚本转换成工作流的最简单方法。

  1. 在脚本的入口点添加 @flow 装饰器。

  2. 在工作流调用的任何方法上添加 @task 装饰器。

这将创建流程和相应的任务。任务在执行前会接收到关于上游依赖项的元数据以及这些依赖项的状态信息。Perfect 在协调这些任务时,会记录下这些依赖关系和状态。

%%file {temp_dir}/my_workflow.py
import httpx

from prefect import flow, task # Prefect flow and task decorators


@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    for repo in github_repos:
        # Call Task 1
        repo_stats = fetch_stats(repo)

        # Call Task 2
        stars = get_stars(repo_stats)

        # Print the result
        print(f"{repo}: {stars} stars")


@task
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()


@task
def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']


# Run the flow
if __name__ == "__main__":
    show_stars([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])
Writing .temp/my_workflow.py

备注

@flow 装饰器提供的 log_prints=True 参数,会自动将函数内的所有打印语句转换为 INFO 级别的日志。

执行流程#

你可以像运行 Python 脚本那样运行 Prefect 流程

python my_workflow.py

您的终端输出应该与以下内容相似:

08:21:31.335 | INFO    | prefect.engine - Created flow run 'attentive-kestrel' for flow 'show-stars'
08:21:31.336 | INFO    | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/edf6866f-371d-4e51-a9e3-556a525b1146
08:21:31.731 | INFO    | Task run 'fetch_stats-dce' - Finished in state Completed()
08:21:31.775 | INFO    | Task run 'get_stars-585' - Finished in state Completed()
08:21:31.776 | INFO    | Flow run 'attentive-kestrel' - PrefectHQ/prefect: 17318 stars
08:21:32.089 | INFO    | Task run 'fetch_stats-e16' - Finished in state Completed()
08:21:32.118 | INFO    | Task run 'get_stars-756' - Finished in state Completed()
08:21:32.119 | INFO    | Flow run 'attentive-kestrel' - pydantic/pydantic: 186318 stars
08:21:32.409 | INFO    | Task run 'fetch_stats-b62' - Finished in state Completed()
08:21:32.440 | INFO    | Task run 'get_stars-8ad' - Finished in state Completed()
08:21:32.441 | INFO    | Flow run 'attentive-kestrel' - huggingface/transformers: 134848 stars
08:21:32.469 | INFO    | Flow run 'attentive-kestrel' - Finished in state Completed()

Prefect 能够自动追踪流程运行的状态并记录输出,这些信息可以直接在终端或用户界面中查看。