从网站提取数据#
使用 Prefect 获取并分析大量数据
现在,你将通过构建 GitHub 问题分析管道来学习如何处理数据依赖关系和摄取大量数据。
现实世界中处理网络数据时可能会遇到额外的挑战:
API 请求可能会失败或返回缺失或格式错误的数据。
你需要进行多次依赖的 API 调用。
当你事先不知道有多少数据可用时,你需要摄取数据。
from pathlib import Path
temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
temp_dir.mkdir(exist_ok=True)
设置错误处理机制#
通过抛出和捕获错误来优雅地处理它们。例如,如果从API中没有收到2xx的响应码,就抛出一个异常并记录这个错误。
import httpx
from prefect import task
@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> dict | None:
"""Fetch a page of issues for a GitHub repository"""
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status() # Raise an exception if the response is not a 2xx status code
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
摄取大量数据#
使用分页获取大量数据,并同时运行任务以高效分析数据:
from typing import List
from prefect import flow
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
all_issues = []
for repo in repos:
for page in range(1, 3): # Get first 2 pages
issues = fetch_page_of_issues(repo, page)
if not issues:
break
all_issues.extend(issues)
# Run issue analysis tasks concurrently
for issue in all_issues:
analyze_issue.submit(issue) # Submit each task to a task runner
# Wait for all analysis tasks to complete
for detail in issue_details:
result = detail.result() # Block until the task has completed
print(f"Analyzed issue #{result['number']}")
完整代码:
请根据依赖的嵌套流程和任务来结构化代码#
利用嵌套流程和任务帮助更有效地分配任务并辅助调试。
对于涉及多个步骤的更复杂操作,使用嵌套流程。
对于较简单、原子性的操作,使用任务。
下面是如何使用嵌套流程和任务的例子:
from typing import List
from prefect import flow, task
@flow
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
# Fetch and analyze all issues
issues = fetch_repo_issues(repo)
# Calculate metrics
resolution_rate = calculate_resolution_rate(issues)
# ...
@flow
def fetch_repo_issues(repo: str):
"""Nested flow: Fetch all data for a single repository"""
# ...
@task
def calculate_resolution_rate(issues: List[dict]) -> float:
"""Task: Calculate the percentage of closed issues"""
# ...
完整代码:
综合起来#
这里展示了完整的流程,它结合了所有这些组件。还将添加重试、缓存和速率限制,以使工作流程更加健壮。
在运行此代码之前,请确保已设置GitHub API的速率限制。
# GitHub has a rate limit of 60 unauthenticated requests per hour (~0.016 requests per second)
prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016
运行分析:
python repo_analysis.py
输出应该类似于这样:
10:59:13.933 | INFO | prefect.engine - Created flow run 'robust-kangaroo' for flow 'analyze-repo-health'
10:59:13.934 | INFO | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/abdf7f46-6d59-4857-99cd-9e265cadc4a7
10:59:13.954 | INFO | Flow run 'robust-kangaroo' - Analyzing PrefectHQ/prefect...
...
10:59:27.631 | INFO | Flow run 'robust-kangaroo' - Average response time: 0.4 hours
10:59:27.631 | INFO | Flow run 'robust-kangaroo' - Resolution rate: 40.0%
10:59:27.632 | INFO | Flow run 'robust-kangaroo' - Analyzing pydantic/pydantic...
...
10:59:40.990 | INFO | Flow run 'robust-kangaroo' - Average response time: 0.0 hours
10:59:40.991 | INFO | Flow run 'robust-kangaroo' - Resolution rate: 0.0%
10:59:40.991 | INFO | Flow run 'robust-kangaroo' - Analyzing huggingface/transformers...
...
10:59:54.225 | INFO | Flow run 'robust-kangaroo' - Average response time: 1.1 hours
10:59:54.225 | INFO | Flow run 'robust-kangaroo' - Resolution rate: 0.0%
10:59:54.240 | INFO | Flow run 'robust-kangaroo' - Finished in state Completed()