
学习如何使用 Prefect 构建健壮且高效的数据管道。

在本教程中,您将学习如何将此流程转变为健壮且高效的数据管道。现实世界是混乱的,而 Prefect 旨在应对这种混乱。

  • 您的 API请求可能会失败。

  • 您的 API 请求运行得太慢。

  • 您的 API 请求运行得太快,导致被限流。

  • 您浪费了时间和金钱重复运行相同的任务。

与其在业务逻辑本身中解决这些问题,不如使用 Prefect 的内置功能来处理它们。

from pathlib import Path

temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():


你可以做出的第一个改进是在流程中加入重试机制。每当 HTTP 请求失败时,你可以在放弃之前尝试重新发送几次。

import httpx
from prefect import task

def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
    api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
    return api_response.json()


Hide code cell content
%%file {temp_dir}/retry_on_failure.py
import httpx

from prefect import flow, task # Prefect flow and task decorators

def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    for repo in github_repos:
        # Call Task 1
        repo_stats = fetch_stats(repo)

        # Call Task 2
        stars = get_stars(repo_stats)

        # Print the result
        print(f"{repo}: {stars} stars")

def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
    api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
    return api_response.json()

def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']

# Run the flow
if __name__ == "__main__":
Writing .temp/retry_on_failure.py


如果单个 API 请求响应缓慢,你可以通过同时发起多个请求来在总体上加快速度。当你对任务调用 submit 方法时,该任务将被提交给任务执行器进行执行。

from prefect import flow

def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    # Task 1: Make HTTP requests concurrently
    repo_stats = []
    for repo in github_repos:
            'repo': repo,
            'task': fetch_stats.submit(repo) # Submit each task to a task runner

    # Task 2: Once each concurrent task completes, show the results
    for repo in repo_stats:
        repo_name = repo['repo']
        stars = get_stars(repo['task'].result()) # Block until the task has completed
        print(f"{repo_name}: {stars} stars")


Hide code cell content
%%file {temp_dir}/slow_tasks.py
import httpx

from prefect import flow, task # Prefect flow and task decorators

def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    # Task 1: Make HTTP requests concurrently
    repo_stats = []
    for repo in github_repos:
            'repo': repo,
            'task': fetch_stats.submit(repo) # Submit each task to a task runner

    # Task 2: Once each concurrent task completes, show the results
    for repo in repo_stats:
        repo_name = repo['repo']
        stars = get_stars(repo['task'].result()) # Block until the task has completed
        print(f"{repo_name}: {stars} stars")

def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()

def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']

# Run the flow
if __name__ == "__main__":
Writing .temp/slow_tasks.py


同时运行任务的后果是,你更有可能触及到你所使用的任何 API 的速率限制。为避免这种情况,可以使用 Prefect 设置全局并发限制。

# GitHub每小时限制 60 次非认证请求(约每秒0.016次请求)}}
prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016


from prefect import flow
from prefect.concurrency.sync import rate_limit

def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    repo_stats = []
    for repo in github_repos:
        # Apply the concurrency limit to this loop

        # Call Task 1
            'repo': repo,
            'task': fetch_stats.submit(repo)

        # ...


Hide code cell content
%%file {temp_dir}/rate_limit.py
import httpx

from prefect import flow, task # Prefect flow and task decorators
from prefect.concurrency.sync import rate_limit

def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    repo_stats = []
    for repo in github_repos:
        # Apply the concurrency limit to this loop

        # Call Task 1
            'repo': repo,
            'task': fetch_stats.submit(repo)

        # Call Task 2
        stars = get_stars(repo_stats)

        # Print the result
        print(f"{repo}: {stars} stars")

def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()

def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']

# Run the flow
if __name__ == "__main__":
Writing .temp/rate_limit.py



from datetime import timedelta

from prefect import task
from prefect.cache_policies import INPUTS

@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""
    # ...


Hide code cell content
%%file {temp_dir}/cache_task.py
from datetime import timedelta
import httpx

from prefect import flow, task # Prefect flow and task decorators
from prefect.cache_policies import INPUTS

def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    for repo in github_repos:
        # Call Task 1
        repo_stats = fetch_stats(repo)

        # Call Task 2
        stars = get_stars(repo_stats)

        # Print the result
        print(f"{repo}: {stars} stars")

@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()

def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']

# Run the flow
if __name__ == "__main__":
Writing .temp/cache_task.py



%%file {temp_dir}/my_data_pipeline.py
from datetime import timedelta
import httpx

from prefect import flow, task
from prefect.cache_policies import INPUTS
from prefect.concurrency.sync import rate_limit

def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    # Task 1: Make HTTP requests concurrently while respecting concurrency limits
    repo_stats = []
    for repo in github_repos:
            'repo': repo,
            'task': fetch_stats.submit(repo) # Submit each task to a task runner

    # Task 2: Once each concurrent task completes, show the results
    for repo in repo_stats:
        repo_name = repo['repo']
        stars = get_stars(repo['task'].result()) # Block until the task has completed
        print(f"{repo_name}: {stars} stars")

@task(retries=3, cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
    api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
    return api_response.json()

def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']

# Run the flow
if __name__ == "__main__":
Writing .temp/my_data_pipeline.py


# Run the tasks and cache the results
python my_data_pipeline.py

# Retrieve the cached results
python my_data_pipeline.py


09:08:12.265 | INFO    | prefect.engine - Created flow run 'laughing-nightingale' for flow 'show-stars'
09:08:12.266 | INFO    | prefect.engine - View at
09:08:12.322 | INFO    | Task run 'fetch_stats-0c9' - Finished in state Cached(type=COMPLETED)
09:08:12.359 | INFO    | Task run 'fetch_stats-e89' - Finished in state Cached(type=COMPLETED)
09:08:12.360 | INFO    | Task run 'get_stars-b51' - Finished in state Completed()
09:08:12.361 | INFO    | Flow run 'laughing-nightingale' - PrefectHQ/prefect: 17320 stars
09:08:12.372 | INFO    | Task run 'fetch_stats-8ef' - Finished in state Cached(type=COMPLETED)
09:08:12.374 | INFO    | Task run 'get_stars-08d' - Finished in state Completed()
09:08:12.374 | INFO    | Flow run 'laughing-nightingale' - pydantic/pydantic: 186319 stars
09:08:12.387 | INFO    | Task run 'get_stars-2af' - Finished in state Completed()
09:08:12.387 | INFO    | Flow run 'laughing-nightingale' - huggingface/transformers: 134849 stars
09:08:12.404 | INFO    | Flow run 'laughing-nightingale' - Finished in state Completed()