构建数据管道#

学习如何使用 Prefect 构建健壮且高效的数据管道。

在本教程中,您将学习如何将此流程转变为健壮且高效的数据管道。现实世界是混乱的,而 Prefect 旨在应对这种混乱。

  • 您的 API请求可能会失败。

  • 您的 API 请求运行得太慢。

  • 您的 API 请求运行得太快,导致被限流。

  • 您浪费了时间和金钱重复运行相同的任务。

与其在业务逻辑本身中解决这些问题,不如使用 Prefect 的内置功能来处理它们。

from pathlib import Path

temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
    temp_dir.mkdir(exist_ok=True)

失败重试#

你可以做出的第一个改进是在流程中加入重试机制。每当 HTTP 请求失败时,你可以在放弃之前尝试重新发送几次。

import httpx
from prefect import task

@task(retries=3)
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
    api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
    return api_response.json()

完整示例:

Hide code cell content
%%file {temp_dir}/retry_on_failure.py
import httpx

from prefect import flow, task # Prefect flow and task decorators


@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    for repo in github_repos:
        # Call Task 1
        repo_stats = fetch_stats(repo)

        # Call Task 2
        stars = get_stars(repo_stats)

        # Print the result
        print(f"{repo}: {stars} stars")


@task(retries=3)
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
    api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
    return api_response.json()


@task
def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']


# Run the flow
if __name__ == "__main__":
    show_stars([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])
Writing .temp/retry_on_failure.py

并行执行慢速任务#

如果单个 API 请求响应缓慢,你可以通过同时发起多个请求来在总体上加快速度。当你对任务调用 submit 方法时,该任务将被提交给任务执行器进行执行。

from prefect import flow

@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    # Task 1: Make HTTP requests concurrently
    repo_stats = []
    for repo in github_repos:
        repo_stats.append({
            'repo': repo,
            'task': fetch_stats.submit(repo) # Submit each task to a task runner
        })

    # Task 2: Once each concurrent task completes, show the results
    for repo in repo_stats:
        repo_name = repo['repo']
        stars = get_stars(repo['task'].result()) # Block until the task has completed
        print(f"{repo_name}: {stars} stars")

完整示例:

Hide code cell content
%%file {temp_dir}/slow_tasks.py
import httpx

from prefect import flow, task # Prefect flow and task decorators


@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    # Task 1: Make HTTP requests concurrently
    repo_stats = []
    for repo in github_repos:
        repo_stats.append({
            'repo': repo,
            'task': fetch_stats.submit(repo) # Submit each task to a task runner
        })

    # Task 2: Once each concurrent task completes, show the results
    for repo in repo_stats:
        repo_name = repo['repo']
        stars = get_stars(repo['task'].result()) # Block until the task has completed
        print(f"{repo_name}: {stars} stars")


@task
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()


@task
def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']


# Run the flow
if __name__ == "__main__":
    show_stars([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])
Writing .temp/slow_tasks.py

避免达到速率限制#

同时运行任务的后果是,你更有可能触及到你所使用的任何 API 的速率限制。为避免这种情况,可以使用 Prefect 设置全局并发限制。

# GitHub每小时限制 60 次非认证请求(约每秒0.016次请求)}}
prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016

现在,你可以在你的代码中使用这个全局并发限制了。

from prefect import flow
from prefect.concurrency.sync import rate_limit

@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    repo_stats = []
    for repo in github_repos:
        # Apply the concurrency limit to this loop
        rate_limit("github-api")

        # Call Task 1
        repo_stats.append({
            'repo': repo,
            'task': fetch_stats.submit(repo)
        })

        # ...

完整代码:

Hide code cell content
%%file {temp_dir}/rate_limit.py
import httpx

from prefect import flow, task # Prefect flow and task decorators
from prefect.concurrency.sync import rate_limit


@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    repo_stats = []
    for repo in github_repos:
        # Apply the concurrency limit to this loop
        rate_limit("github-api")

        # Call Task 1
        repo_stats.append({
            'repo': repo,
            'task': fetch_stats.submit(repo)
        })

        # Call Task 2
        stars = get_stars(repo_stats)

        # Print the result
        print(f"{repo}: {stars} stars")


@task
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()


@task
def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']


# Run the flow
if __name__ == "__main__":
    show_stars([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])
Writing .temp/rate_limit.py

缓存任务结果#

为了提高效率,可以跳过那些已经执行过的任务。例如,如果你不想每天多次获取特定仓库的星标数量,你可以将这个结果缓存一天。

from datetime import timedelta

from prefect import task
from prefect.cache_policies import INPUTS

@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""
    # ...
    

完整代码:

Hide code cell content
%%file {temp_dir}/cache_task.py
from datetime import timedelta
import httpx

from prefect import flow, task # Prefect flow and task decorators
from prefect.cache_policies import INPUTS


@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    for repo in github_repos:
        # Call Task 1
        repo_stats = fetch_stats(repo)

        # Call Task 2
        stars = get_stars(repo_stats)

        # Print the result
        print(f"{repo}: {stars} stars")


@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()


@task
def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']


# Run the flow
if __name__ == "__main__":
    show_stars([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])
Writing .temp/cache_task.py

运行改善后的任务#

应用了所有这些改进后,你的流程将呈现如下:

%%file {temp_dir}/my_data_pipeline.py
from datetime import timedelta
import httpx

from prefect import flow, task
from prefect.cache_policies import INPUTS
from prefect.concurrency.sync import rate_limit


@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    # Task 1: Make HTTP requests concurrently while respecting concurrency limits
    repo_stats = []
    for repo in github_repos:
        rate_limit("github-api")
        repo_stats.append({
            'repo': repo,
            'task': fetch_stats.submit(repo) # Submit each task to a task runner
        })

    # Task 2: Once each concurrent task completes, show the results
    for repo in repo_stats:
        repo_name = repo['repo']
        stars = get_stars(repo['task'].result()) # Block until the task has completed
        print(f"{repo_name}: {stars} stars")


@task(retries=3, cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
    api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
    return api_response.json()


@task
def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']


# Run the flow
if __name__ == "__main__":
    show_stars([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])
Writing .temp/my_data_pipeline.py

执行两次流程:第一次运行任务并缓存结果,第二次从缓存中检索结果。

# Run the tasks and cache the results
python my_data_pipeline.py

# Retrieve the cached results
python my_data_pipeline.py

第二次流程运行的终端输出应该如下所示:

09:08:12.265 | INFO    | prefect.engine - Created flow run 'laughing-nightingale' for flow 'show-stars'
09:08:12.266 | INFO    | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/541864e8-12f7-4890-9397-b2ed361f6b20
09:08:12.322 | INFO    | Task run 'fetch_stats-0c9' - Finished in state Cached(type=COMPLETED)
09:08:12.359 | INFO    | Task run 'fetch_stats-e89' - Finished in state Cached(type=COMPLETED)
09:08:12.360 | INFO    | Task run 'get_stars-b51' - Finished in state Completed()
09:08:12.361 | INFO    | Flow run 'laughing-nightingale' - PrefectHQ/prefect: 17320 stars
09:08:12.372 | INFO    | Task run 'fetch_stats-8ef' - Finished in state Cached(type=COMPLETED)
09:08:12.374 | INFO    | Task run 'get_stars-08d' - Finished in state Completed()
09:08:12.374 | INFO    | Flow run 'laughing-nightingale' - pydantic/pydantic: 186319 stars
09:08:12.387 | INFO    | Task run 'get_stars-2af' - Finished in state Completed()
09:08:12.387 | INFO    | Flow run 'laughing-nightingale' - huggingface/transformers: 134849 stars
09:08:12.404 | INFO    | Flow run 'laughing-nightingale' - Finished in state Completed()