构建数据管道#
学习如何使用 Prefect 构建健壮且高效的数据管道。
在本教程中,您将学习如何将此流程转变为健壮且高效的数据管道。现实世界是混乱的,而 Prefect 旨在应对这种混乱。
您的 API请求可能会失败。
您的 API 请求运行得太慢。
您的 API 请求运行得太快,导致被限流。
您浪费了时间和金钱重复运行相同的任务。
与其在业务逻辑本身中解决这些问题,不如使用 Prefect 的内置功能来处理它们。
from pathlib import Path
temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
temp_dir.mkdir(exist_ok=True)
失败重试#
你可以做出的第一个改进是在流程中加入重试机制。每当 HTTP 请求失败时,你可以在放弃之前尝试重新发送几次。
import httpx
from prefect import task
@task(retries=3)
def fetch_stats(github_repo: str):
"""Task 1: Fetch the statistics for a GitHub repo"""
api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
return api_response.json()
完整示例:
Show code cell content
%%file {temp_dir}/retry_on_failure.py
import httpx
from prefect import flow, task # Prefect flow and task decorators
@flow(log_prints=True)
def show_stars(github_repos: list[str]):
"""Flow: Show the number of stars that GitHub repos have"""
for repo in github_repos:
# Call Task 1
repo_stats = fetch_stats(repo)
# Call Task 2
stars = get_stars(repo_stats)
# Print the result
print(f"{repo}: {stars} stars")
@task(retries=3)
def fetch_stats(github_repo: str):
"""Task 1: Fetch the statistics for a GitHub repo"""
api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
return api_response.json()
@task
def get_stars(repo_stats: dict):
"""Task 2: Get the number of stars from GitHub repo statistics"""
return repo_stats['stargazers_count']
# Run the flow
if __name__ == "__main__":
show_stars([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Writing .temp/retry_on_failure.py
并行执行慢速任务#
如果单个 API 请求响应缓慢,你可以通过同时发起多个请求来在总体上加快速度。当你对任务调用 submit
方法时,该任务将被提交给任务执行器进行执行。
from prefect import flow
@flow(log_prints=True)
def show_stars(github_repos: list[str]):
"""Flow: Show the number of stars that GitHub repos have"""
# Task 1: Make HTTP requests concurrently
repo_stats = []
for repo in github_repos:
repo_stats.append({
'repo': repo,
'task': fetch_stats.submit(repo) # Submit each task to a task runner
})
# Task 2: Once each concurrent task completes, show the results
for repo in repo_stats:
repo_name = repo['repo']
stars = get_stars(repo['task'].result()) # Block until the task has completed
print(f"{repo_name}: {stars} stars")
完整示例:
Show code cell content
%%file {temp_dir}/slow_tasks.py
import httpx
from prefect import flow, task # Prefect flow and task decorators
@flow(log_prints=True)
def show_stars(github_repos: list[str]):
"""Flow: Show the number of stars that GitHub repos have"""
# Task 1: Make HTTP requests concurrently
repo_stats = []
for repo in github_repos:
repo_stats.append({
'repo': repo,
'task': fetch_stats.submit(repo) # Submit each task to a task runner
})
# Task 2: Once each concurrent task completes, show the results
for repo in repo_stats:
repo_name = repo['repo']
stars = get_stars(repo['task'].result()) # Block until the task has completed
print(f"{repo_name}: {stars} stars")
@task
def fetch_stats(github_repo: str):
"""Task 1: Fetch the statistics for a GitHub repo"""
return httpx.get(f"https://api.github.com/repos/{github_repo}").json()
@task
def get_stars(repo_stats: dict):
"""Task 2: Get the number of stars from GitHub repo statistics"""
return repo_stats['stargazers_count']
# Run the flow
if __name__ == "__main__":
show_stars([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Writing .temp/slow_tasks.py
避免达到速率限制#
同时运行任务的后果是,你更有可能触及到你所使用的任何 API 的速率限制。为避免这种情况,可以使用 Prefect 设置全局并发限制。
# GitHub每小时限制 60 次非认证请求(约每秒0.016次请求)}}
prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016
现在,你可以在你的代码中使用这个全局并发限制了。
from prefect import flow
from prefect.concurrency.sync import rate_limit
@flow(log_prints=True)
def show_stars(github_repos: list[str]):
"""Flow: Show the number of stars that GitHub repos have"""
repo_stats = []
for repo in github_repos:
# Apply the concurrency limit to this loop
rate_limit("github-api")
# Call Task 1
repo_stats.append({
'repo': repo,
'task': fetch_stats.submit(repo)
})
# ...
完整代码:
Show code cell content
%%file {temp_dir}/rate_limit.py
import httpx
from prefect import flow, task # Prefect flow and task decorators
from prefect.concurrency.sync import rate_limit
@flow(log_prints=True)
def show_stars(github_repos: list[str]):
"""Flow: Show the number of stars that GitHub repos have"""
repo_stats = []
for repo in github_repos:
# Apply the concurrency limit to this loop
rate_limit("github-api")
# Call Task 1
repo_stats.append({
'repo': repo,
'task': fetch_stats.submit(repo)
})
# Call Task 2
stars = get_stars(repo_stats)
# Print the result
print(f"{repo}: {stars} stars")
@task
def fetch_stats(github_repo: str):
"""Task 1: Fetch the statistics for a GitHub repo"""
return httpx.get(f"https://api.github.com/repos/{github_repo}").json()
@task
def get_stars(repo_stats: dict):
"""Task 2: Get the number of stars from GitHub repo statistics"""
return repo_stats['stargazers_count']
# Run the flow
if __name__ == "__main__":
show_stars([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Writing .temp/rate_limit.py
缓存任务结果#
为了提高效率,可以跳过那些已经执行过的任务。例如,如果你不想每天多次获取特定仓库的星标数量,你可以将这个结果缓存一天。
from datetime import timedelta
from prefect import task
from prefect.cache_policies import INPUTS
@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
"""Task 1: Fetch the statistics for a GitHub repo"""
# ...
完整代码:
Show code cell content
%%file {temp_dir}/cache_task.py
from datetime import timedelta
import httpx
from prefect import flow, task # Prefect flow and task decorators
from prefect.cache_policies import INPUTS
@flow(log_prints=True)
def show_stars(github_repos: list[str]):
"""Flow: Show the number of stars that GitHub repos have"""
for repo in github_repos:
# Call Task 1
repo_stats = fetch_stats(repo)
# Call Task 2
stars = get_stars(repo_stats)
# Print the result
print(f"{repo}: {stars} stars")
@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
"""Task 1: Fetch the statistics for a GitHub repo"""
return httpx.get(f"https://api.github.com/repos/{github_repo}").json()
@task
def get_stars(repo_stats: dict):
"""Task 2: Get the number of stars from GitHub repo statistics"""
return repo_stats['stargazers_count']
# Run the flow
if __name__ == "__main__":
show_stars([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Writing .temp/cache_task.py
运行改善后的任务#
应用了所有这些改进后,你的流程将呈现如下:
%%file {temp_dir}/my_data_pipeline.py
from datetime import timedelta
import httpx
from prefect import flow, task
from prefect.cache_policies import INPUTS
from prefect.concurrency.sync import rate_limit
@flow(log_prints=True)
def show_stars(github_repos: list[str]):
"""Flow: Show the number of stars that GitHub repos have"""
# Task 1: Make HTTP requests concurrently while respecting concurrency limits
repo_stats = []
for repo in github_repos:
rate_limit("github-api")
repo_stats.append({
'repo': repo,
'task': fetch_stats.submit(repo) # Submit each task to a task runner
})
# Task 2: Once each concurrent task completes, show the results
for repo in repo_stats:
repo_name = repo['repo']
stars = get_stars(repo['task'].result()) # Block until the task has completed
print(f"{repo_name}: {stars} stars")
@task(retries=3, cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
"""Task 1: Fetch the statistics for a GitHub repo"""
api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
return api_response.json()
@task
def get_stars(repo_stats: dict):
"""Task 2: Get the number of stars from GitHub repo statistics"""
return repo_stats['stargazers_count']
# Run the flow
if __name__ == "__main__":
show_stars([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Writing .temp/my_data_pipeline.py
执行两次流程:第一次运行任务并缓存结果,第二次从缓存中检索结果。
# Run the tasks and cache the results
python my_data_pipeline.py
# Retrieve the cached results
python my_data_pipeline.py
第二次流程运行的终端输出应该如下所示:
09:08:12.265 | INFO | prefect.engine - Created flow run 'laughing-nightingale' for flow 'show-stars'
09:08:12.266 | INFO | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/541864e8-12f7-4890-9397-b2ed361f6b20
09:08:12.322 | INFO | Task run 'fetch_stats-0c9' - Finished in state Cached(type=COMPLETED)
09:08:12.359 | INFO | Task run 'fetch_stats-e89' - Finished in state Cached(type=COMPLETED)
09:08:12.360 | INFO | Task run 'get_stars-b51' - Finished in state Completed()
09:08:12.361 | INFO | Flow run 'laughing-nightingale' - PrefectHQ/prefect: 17320 stars
09:08:12.372 | INFO | Task run 'fetch_stats-8ef' - Finished in state Cached(type=COMPLETED)
09:08:12.374 | INFO | Task run 'get_stars-08d' - Finished in state Completed()
09:08:12.374 | INFO | Flow run 'laughing-nightingale' - pydantic/pydantic: 186319 stars
09:08:12.387 | INFO | Task run 'get_stars-2af' - Finished in state Completed()
09:08:12.387 | INFO | Flow run 'laughing-nightingale' - huggingface/transformers: 134849 stars
09:08:12.404 | INFO | Flow run 'laughing-nightingale' - Finished in state Completed()