从网站提取数据#

使用 Prefect 获取并分析大量数据

现在,你将通过构建 GitHub 问题分析管道来学习如何处理数据依赖关系和摄取大量数据。

现实世界中处理网络数据时可能会遇到额外的挑战:

  • API 请求可能会失败或返回缺失或格式错误的数据。

  • 你需要进行多次依赖的 API 调用。

  • 当你事先不知道有多少数据可用时,你需要摄取数据。

from pathlib import Path

temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
    temp_dir.mkdir(exist_ok=True)

设置错误处理机制#

通过抛出和捕获错误来优雅地处理它们。例如,如果从API中没有收到2xx的响应码,就抛出一个异常并记录这个错误。

import httpx
from prefect import task

@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> dict | None:
    """Fetch a page of issues for a GitHub repository"""
    try:
        response = httpx.get(
            f"https://api.github.com/repos/{repo}/issues",
            params={"page": page, "state": "all", "per_page": 100}
        )
        response.raise_for_status() # Raise an exception if the response is not a 2xx status code
        return response.json()
    except Exception as e:
        print(f"Error fetching issues for {repo}: {e}")
        return None
Hide code cell content
%%file {temp_dir}/error_handling.py
from typing import List, Optional
import httpx

from prefect import flow, task


@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
    """Analyze issue health metrics for GitHub repositories"""
    for repo in repos:
        print(f"Analyzing {repo}...")
        
        # Fetch and analyze all issues
        fetch_page_of_issues(repo)


@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
    """Fetch a page of issues for a GitHub repository"""
    try:
        response = httpx.get(
            f"https://api.github.com/repos/{repo}/issues",
            params={"page": page, "state": "all", "per_page": 100}
        )
        response.raise_for_status() # Raise an exception if the response is not a 2xx status code
        return response.json()
    except Exception as e:
        print(f"Error fetching issues for {repo}: {e}")
        return None


if __name__ == "__main__":
    analyze_repo_health([
        "PrefectHQ/prefect",
        "this-repo-does-not-exist/404" # This repo will trigger an error
    ])
Writing .temp/error_handling.py

摄取大量数据#

使用分页获取大量数据,并同时运行任务以高效分析数据:

from typing import List

from prefect import flow

@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
    """Analyze issue health metrics for GitHub repositories"""
    all_issues = []

    for repo in repos:
        for page in range(1, 3):  # Get first 2 pages
            issues = fetch_page_of_issues(repo, page)
            if not issues:
                break
            all_issues.extend(issues)
    
    # Run issue analysis tasks concurrently
    for issue in all_issues:
        analyze_issue.submit(issue) # Submit each task to a task runner
    
    # Wait for all analysis tasks to complete
    for detail in issue_details:
        result = detail.result() # Block until the task has completed
        print(f"Analyzed issue #{result['number']}")

完整代码:

Hide code cell content
%%file {temp_dir}/large_data.py
from typing import List, Optional
import httpx

from prefect import flow, task


@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
    """Analyze issue health metrics for GitHub repositories"""
    for repo in repos:
        print(f"Analyzing {repo}...")
        
        # Fetch and analyze all issues
        fetch_repo_issues(repo)


@flow
def fetch_repo_issues(repo: str):
    """Fetch all issues for a single repository"""
    all_issues = []
    page = 1
    
    for page in range(1, 3):  # Limit to 2 pages to avoid hitting rate limits
        issues = fetch_page_of_issues(repo, page)
        if not issues or len(issues) == 0:
            break
        all_issues.extend(issues)
        page += 1

    issue_details = []
    for issue in all_issues[:5]:  # Limit to 5 issues to avoid hitting rate limits
        issue_details.append(
            fetch_issue_details.submit(repo, issue['number'])  # Submit each task to a task runner
        )
    
    details = []
    for issue in issue_details:
        details.append(issue.result())

    return details


@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
    """Fetch a page of issues for a GitHub repository"""
    try:
        response = httpx.get(
            f"https://api.github.com/repos/{repo}/issues",
            params={"page": page, "state": "all", "per_page": 100}
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print(f"Error fetching issues for {repo}: {e}")
        return None


@task
def fetch_issue_details(repo: str, issue_number: int) -> dict:
    """Fetch detailed information about a specific issue"""
    response = httpx.get(f"https://api.github.com/repos/{repo}/issues/{issue_number}")
    issue_data = response.json()
    
    return issue_data


if __name__ == "__main__":
    analyze_repo_health([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])
Writing .temp/large_data.py

请根据依赖的嵌套流程和任务来结构化代码#

利用嵌套流程和任务帮助更有效地分配任务并辅助调试。

  • 对于涉及多个步骤的更复杂操作,使用嵌套流程。

  • 对于较简单、原子性的操作,使用任务。

下面是如何使用嵌套流程和任务的例子:

from typing import List

from prefect import flow, task


@flow
def analyze_repo_health(repos: List[str]):
    """Analyze issue health metrics for GitHub repositories"""
    for repo in repos:
        
        # Fetch and analyze all issues
        issues = fetch_repo_issues(repo)
        
        # Calculate metrics
        resolution_rate = calculate_resolution_rate(issues)
        # ...


@flow
def fetch_repo_issues(repo: str):
    """Nested flow: Fetch all data for a single repository"""

    # ...


@task
def calculate_resolution_rate(issues: List[dict]) -> float:
    """Task: Calculate the percentage of closed issues"""

    # ...

完整代码:

Hide code cell content
%%file {temp_dir}/structure_code.py
from typing import List, Optional
import httpx

from prefect import flow, task


@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
    """Analyze issue health metrics for GitHub repositories"""
    for repo in repos:
        print(f"Analyzing {repo}...")
        
        # Fetch and analyze all issues
        issues = fetch_repo_issues(repo)
        
        # Calculate metrics
        resolution_rate = calculate_resolution_rate(issues)
        
        print(f"Resolution rate: {resolution_rate:.1f}%")


@flow
def fetch_repo_issues(repo: str):
    """Fetch all issues for a single repository"""
    all_issues = []
    page = 1
    
    for page in range(1, 3):  # Limit to 2 pages to avoid hitting rate limits
        issues = fetch_page_of_issues(repo, page)
        if not issues or len(issues) == 0:
            break
        all_issues.extend(issues)
        page += 1

    issue_details = []
    for issue in all_issues[:5]:  # Limit to 5 issues to avoid hitting rate limits
        issue_details.append(
            fetch_issue_details.submit(repo, issue['number'])
        )
    
    details = []
    for issue in issue_details:
        details.append(issue.result())

    return details


@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
    """Fetch a page of issues for a GitHub repository"""
    try:
        response = httpx.get(
            f"https://api.github.com/repos/{repo}/issues",
            params={"page": page, "state": "all", "per_page": 100}
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print(f"Error fetching issues for {repo}: {e}")
        return None


@task
def fetch_issue_details(repo: str, issue_number: int) -> dict:
    """Fetch detailed information about a specific issue"""
    response = httpx.get(f"https://api.github.com/repos/{repo}/issues/{issue_number}")
    issue_data = response.json()
    
    return issue_data


@task
def calculate_resolution_rate(issues: List[dict]) -> float:
    """Calculate the percentage of closed issues"""
    if not issues:
        return 0
    closed = sum(1 for issue in issues if issue['state'] == 'closed')
    return (closed / len(issues)) * 100


if __name__ == "__main__":
    analyze_repo_health([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])
Writing .temp/structure_code.py

综合起来#

这里展示了完整的流程,它结合了所有这些组件。还将添加重试、缓存和速率限制,以使工作流程更加健壮。

Hide code cell content
%%file {temp_dir}/repo_analysis.py

from datetime import timedelta, datetime
from statistics import mean
from typing import List, Optional
import httpx

from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.concurrency.sync import rate_limit


@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
    """Analyze issue health metrics for GitHub repositories"""
    for repo in repos:
        print(f"Analyzing {repo}...")
        
        # Fetch and analyze all issues
        issues = fetch_repo_issues(repo)
        
        # Calculate metrics
        avg_response_time = calculate_response_times(issues)
        resolution_rate = calculate_resolution_rate(issues)
        
        print(f"Average response time: {avg_response_time:.1f} hours")
        print(f"Resolution rate: {resolution_rate:.1f}%")


@flow
def fetch_repo_issues(repo: str):
    """Fetch all issues for a single repository"""
    all_issues = []
    page = 1
    
    for page in range(1, 3):  # Limit to 2 pages to avoid hitting rate limits
        issues = fetch_page_of_issues(repo, page)
        if not issues or len(issues) == 0:
            break
        all_issues.extend(issues)
        page += 1

    issue_details = []
    for issue in all_issues[:5]:  # Limit to 5 issues to avoid hitting rate limits
        issue_details.append(
            fetch_issue_details.submit(repo, issue['number'])
        )
    
    details = []
    for issue in issue_details:
        details.append(issue.result())

    return details


@task(log_prints=True, retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
    """Fetch a page of issues for a GitHub repository"""
    rate_limit("github-api")
    try:
        response = httpx.get(
            f"https://api.github.com/repos/{repo}/issues",
            params={"page": page, "state": "all", "per_page": 100}
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print(f"Error fetching issues for {repo}: {e}")
        return None


@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_issue_details(repo: str, issue_number: int) -> dict:
    """Fetch detailed information about a specific issue"""
    rate_limit("github-api")
    response = httpx.get(f"https://api.github.com/repos/{repo}/issues/{issue_number}")
    issue_data = response.json()
    
    # Fetch comments for the issue
    comments = fetch_comments(issue_data['comments_url'])
    issue_data['comments_data'] = comments
    
    return issue_data


@task(log_prints=True, retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_comments(comments_url: str) -> List[dict]:
    """Fetch comments for an issue"""
    rate_limit("github-api")
    try:
        response = httpx.get(comments_url)
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print(f"Error fetching comments: {e}")
        return []


@task
def calculate_response_times(issues: List[dict]) -> float:
    """Calculate average time to first response for issues"""
    response_times = []
    
    for issue in issues:
        comments_data = issue.get('comments_data', [])
        if comments_data:  # If there are comments
            created = datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00'))
            first_comment = datetime.fromisoformat(
                comments_data[0]['created_at'].replace('Z', '+00:00')
            )
            response_time = (first_comment - created).total_seconds() / 3600
            response_times.append(response_time)

    return mean(response_times) if response_times else 0


@task
def calculate_resolution_rate(issues: List[dict]) -> float:
    """Calculate the percentage of closed issues"""
    if not issues:
        return 0
    closed = sum(1 for issue in issues if issue['state'] == 'closed')
    return (closed / len(issues)) * 100


if __name__ == "__main__":
    analyze_repo_health([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])
Writing .temp/repo_analysis.py

在运行此代码之前,请确保已设置GitHub API的速率限制。

# GitHub has a rate limit of 60 unauthenticated requests per hour (~0.016 requests per second)
prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016

运行分析:

python repo_analysis.py

输出应该类似于这样:

10:59:13.933 | INFO    | prefect.engine - Created flow run 'robust-kangaroo' for flow 'analyze-repo-health'
10:59:13.934 | INFO    | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/abdf7f46-6d59-4857-99cd-9e265cadc4a7
10:59:13.954 | INFO    | Flow run 'robust-kangaroo' - Analyzing PrefectHQ/prefect...
...
10:59:27.631 | INFO    | Flow run 'robust-kangaroo' - Average response time: 0.4 hours
10:59:27.631 | INFO    | Flow run 'robust-kangaroo' - Resolution rate: 40.0%
10:59:27.632 | INFO    | Flow run 'robust-kangaroo' - Analyzing pydantic/pydantic...
...
10:59:40.990 | INFO    | Flow run 'robust-kangaroo' - Average response time: 0.0 hours
10:59:40.991 | INFO    | Flow run 'robust-kangaroo' - Resolution rate: 0.0%
10:59:40.991 | INFO    | Flow run 'robust-kangaroo' - Analyzing huggingface/transformers...
...
10:59:54.225 | INFO    | Flow run 'robust-kangaroo' - Average response time: 1.1 hours
10:59:54.225 | INFO    | Flow run 'robust-kangaroo' - Resolution rate: 0.0%
10:59:54.240 | INFO    | Flow run 'robust-kangaroo' - Finished in state Completed()