从网站提取数据#
使用 Prefect 获取并分析大量数据
现在,你将通过构建 GitHub 问题分析管道来学习如何处理数据依赖关系和摄取大量数据。
现实世界中处理网络数据时可能会遇到额外的挑战:
API 请求可能会失败或返回缺失或格式错误的数据。
你需要进行多次依赖的 API 调用。
当你事先不知道有多少数据可用时,你需要摄取数据。
from pathlib import Path
temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
temp_dir.mkdir(exist_ok=True)
设置错误处理机制#
通过抛出和捕获错误来优雅地处理它们。例如,如果从API中没有收到2xx的响应码,就抛出一个异常并记录这个错误。
import httpx
from prefect import task
@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> dict | None:
"""Fetch a page of issues for a GitHub repository"""
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status() # Raise an exception if the response is not a 2xx status code
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
Show code cell content
%%file {temp_dir}/error_handling.py
from typing import List, Optional
import httpx
from prefect import flow, task
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
print(f"Analyzing {repo}...")
# Fetch and analyze all issues
fetch_page_of_issues(repo)
@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
"""Fetch a page of issues for a GitHub repository"""
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status() # Raise an exception if the response is not a 2xx status code
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
if __name__ == "__main__":
analyze_repo_health([
"PrefectHQ/prefect",
"this-repo-does-not-exist/404" # This repo will trigger an error
])
Writing .temp/error_handling.py
摄取大量数据#
使用分页获取大量数据,并同时运行任务以高效分析数据:
from typing import List
from prefect import flow
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
all_issues = []
for repo in repos:
for page in range(1, 3): # Get first 2 pages
issues = fetch_page_of_issues(repo, page)
if not issues:
break
all_issues.extend(issues)
# Run issue analysis tasks concurrently
for issue in all_issues:
analyze_issue.submit(issue) # Submit each task to a task runner
# Wait for all analysis tasks to complete
for detail in issue_details:
result = detail.result() # Block until the task has completed
print(f"Analyzed issue #{result['number']}")
完整代码:
Show code cell content
%%file {temp_dir}/large_data.py
from typing import List, Optional
import httpx
from prefect import flow, task
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
print(f"Analyzing {repo}...")
# Fetch and analyze all issues
fetch_repo_issues(repo)
@flow
def fetch_repo_issues(repo: str):
"""Fetch all issues for a single repository"""
all_issues = []
page = 1
for page in range(1, 3): # Limit to 2 pages to avoid hitting rate limits
issues = fetch_page_of_issues(repo, page)
if not issues or len(issues) == 0:
break
all_issues.extend(issues)
page += 1
issue_details = []
for issue in all_issues[:5]: # Limit to 5 issues to avoid hitting rate limits
issue_details.append(
fetch_issue_details.submit(repo, issue['number']) # Submit each task to a task runner
)
details = []
for issue in issue_details:
details.append(issue.result())
return details
@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
"""Fetch a page of issues for a GitHub repository"""
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
@task
def fetch_issue_details(repo: str, issue_number: int) -> dict:
"""Fetch detailed information about a specific issue"""
response = httpx.get(f"https://api.github.com/repos/{repo}/issues/{issue_number}")
issue_data = response.json()
return issue_data
if __name__ == "__main__":
analyze_repo_health([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Writing .temp/large_data.py
请根据依赖的嵌套流程和任务来结构化代码#
利用嵌套流程和任务帮助更有效地分配任务并辅助调试。
对于涉及多个步骤的更复杂操作,使用嵌套流程。
对于较简单、原子性的操作,使用任务。
下面是如何使用嵌套流程和任务的例子:
from typing import List
from prefect import flow, task
@flow
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
# Fetch and analyze all issues
issues = fetch_repo_issues(repo)
# Calculate metrics
resolution_rate = calculate_resolution_rate(issues)
# ...
@flow
def fetch_repo_issues(repo: str):
"""Nested flow: Fetch all data for a single repository"""
# ...
@task
def calculate_resolution_rate(issues: List[dict]) -> float:
"""Task: Calculate the percentage of closed issues"""
# ...
完整代码:
Show code cell content
%%file {temp_dir}/structure_code.py
from typing import List, Optional
import httpx
from prefect import flow, task
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
print(f"Analyzing {repo}...")
# Fetch and analyze all issues
issues = fetch_repo_issues(repo)
# Calculate metrics
resolution_rate = calculate_resolution_rate(issues)
print(f"Resolution rate: {resolution_rate:.1f}%")
@flow
def fetch_repo_issues(repo: str):
"""Fetch all issues for a single repository"""
all_issues = []
page = 1
for page in range(1, 3): # Limit to 2 pages to avoid hitting rate limits
issues = fetch_page_of_issues(repo, page)
if not issues or len(issues) == 0:
break
all_issues.extend(issues)
page += 1
issue_details = []
for issue in all_issues[:5]: # Limit to 5 issues to avoid hitting rate limits
issue_details.append(
fetch_issue_details.submit(repo, issue['number'])
)
details = []
for issue in issue_details:
details.append(issue.result())
return details
@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
"""Fetch a page of issues for a GitHub repository"""
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
@task
def fetch_issue_details(repo: str, issue_number: int) -> dict:
"""Fetch detailed information about a specific issue"""
response = httpx.get(f"https://api.github.com/repos/{repo}/issues/{issue_number}")
issue_data = response.json()
return issue_data
@task
def calculate_resolution_rate(issues: List[dict]) -> float:
"""Calculate the percentage of closed issues"""
if not issues:
return 0
closed = sum(1 for issue in issues if issue['state'] == 'closed')
return (closed / len(issues)) * 100
if __name__ == "__main__":
analyze_repo_health([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Writing .temp/structure_code.py
综合起来#
这里展示了完整的流程,它结合了所有这些组件。还将添加重试、缓存和速率限制,以使工作流程更加健壮。
Show code cell content
%%file {temp_dir}/repo_analysis.py
from datetime import timedelta, datetime
from statistics import mean
from typing import List, Optional
import httpx
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.concurrency.sync import rate_limit
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
print(f"Analyzing {repo}...")
# Fetch and analyze all issues
issues = fetch_repo_issues(repo)
# Calculate metrics
avg_response_time = calculate_response_times(issues)
resolution_rate = calculate_resolution_rate(issues)
print(f"Average response time: {avg_response_time:.1f} hours")
print(f"Resolution rate: {resolution_rate:.1f}%")
@flow
def fetch_repo_issues(repo: str):
"""Fetch all issues for a single repository"""
all_issues = []
page = 1
for page in range(1, 3): # Limit to 2 pages to avoid hitting rate limits
issues = fetch_page_of_issues(repo, page)
if not issues or len(issues) == 0:
break
all_issues.extend(issues)
page += 1
issue_details = []
for issue in all_issues[:5]: # Limit to 5 issues to avoid hitting rate limits
issue_details.append(
fetch_issue_details.submit(repo, issue['number'])
)
details = []
for issue in issue_details:
details.append(issue.result())
return details
@task(log_prints=True, retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
"""Fetch a page of issues for a GitHub repository"""
rate_limit("github-api")
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_issue_details(repo: str, issue_number: int) -> dict:
"""Fetch detailed information about a specific issue"""
rate_limit("github-api")
response = httpx.get(f"https://api.github.com/repos/{repo}/issues/{issue_number}")
issue_data = response.json()
# Fetch comments for the issue
comments = fetch_comments(issue_data['comments_url'])
issue_data['comments_data'] = comments
return issue_data
@task(log_prints=True, retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_comments(comments_url: str) -> List[dict]:
"""Fetch comments for an issue"""
rate_limit("github-api")
try:
response = httpx.get(comments_url)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching comments: {e}")
return []
@task
def calculate_response_times(issues: List[dict]) -> float:
"""Calculate average time to first response for issues"""
response_times = []
for issue in issues:
comments_data = issue.get('comments_data', [])
if comments_data: # If there are comments
created = datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00'))
first_comment = datetime.fromisoformat(
comments_data[0]['created_at'].replace('Z', '+00:00')
)
response_time = (first_comment - created).total_seconds() / 3600
response_times.append(response_time)
return mean(response_times) if response_times else 0
@task
def calculate_resolution_rate(issues: List[dict]) -> float:
"""Calculate the percentage of closed issues"""
if not issues:
return 0
closed = sum(1 for issue in issues if issue['state'] == 'closed')
return (closed / len(issues)) * 100
if __name__ == "__main__":
analyze_repo_health([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Writing .temp/repo_analysis.py
在运行此代码之前,请确保已设置GitHub API的速率限制。
# GitHub has a rate limit of 60 unauthenticated requests per hour (~0.016 requests per second)
prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016
运行分析:
python repo_analysis.py
输出应该类似于这样:
10:59:13.933 | INFO | prefect.engine - Created flow run 'robust-kangaroo' for flow 'analyze-repo-health'
10:59:13.934 | INFO | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/abdf7f46-6d59-4857-99cd-9e265cadc4a7
10:59:13.954 | INFO | Flow run 'robust-kangaroo' - Analyzing PrefectHQ/prefect...
...
10:59:27.631 | INFO | Flow run 'robust-kangaroo' - Average response time: 0.4 hours
10:59:27.631 | INFO | Flow run 'robust-kangaroo' - Resolution rate: 40.0%
10:59:27.632 | INFO | Flow run 'robust-kangaroo' - Analyzing pydantic/pydantic...
...
10:59:40.990 | INFO | Flow run 'robust-kangaroo' - Average response time: 0.0 hours
10:59:40.991 | INFO | Flow run 'robust-kangaroo' - Resolution rate: 0.0%
10:59:40.991 | INFO | Flow run 'robust-kangaroo' - Analyzing huggingface/transformers...
...
10:59:54.225 | INFO | Flow run 'robust-kangaroo' - Average response time: 1.1 hours
10:59:54.225 | INFO | Flow run 'robust-kangaroo' - Resolution rate: 0.0%
10:59:54.240 | INFO | Flow run 'robust-kangaroo' - Finished in state Completed()