编写和运行任务#

学习编写任务的基础知识。

在 Prefect 工作流中,完美的任务是独立的工作单元。你可以通过给任何 Python 函数添加 @task 装饰器来将其转变为任务。任务可以:

  • 接受输入,执行工作,并返回输出

  • 跨调用缓存它们的执行结果

  • 将工作流逻辑封装成可重用单元,跨越不同流程

  • 在运行前接收关于上游任务依赖及其状态的元数据

  • 使用自动日志记录捕获运行时详情、标签和最终状态

  • 并发执行

  • 在同一文件中定义或从模块中导入

  • 可以从工作流或其他任务中调用

工作流和任务共享一些通用特性:

  • 它们可以使用各自的装饰器进行定义,该装饰器接受配置设置(见所有任务设置工作流设置

  • 它们可以有一个名称、描述和标签,用于组织和记录

  • 它们提供重试、超时和其他钩子来处理失败和完成事件

这里有包含单一任务的简单流示例:

import httpx
from prefect import flow, task
from typing import Optional


@task
def get_url(url: str, params: Optional[dict[str, any]] = None):
    response = httpx.get(url, params=params)
    response.raise_for_status()
    return response.json()


@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    url = f"https://api.github.com/repos/{repo_name}"
    repo_stats = get_url(url)
    print(f"{repo_name} repository statistics 🤓:")
    print(f"Stars 🌠 : {repo_stats['stargazers_count']}")
    print(f"Forks 🍴 : {repo_stats['forks_count']}")


if __name__ == "__main__":
    get_repo_info()

运行结果:

09:55:55.412 | INFO    | prefect.engine - Created flow run 'great-ammonite' for flow 'get-repo-info'
09:55:55.499 | INFO    | Flow run 'great-ammonite' - Created task run 'get_url-0' for task 'get_url'
09:55:55.500 | INFO    | Flow run 'great-ammonite' - Executing 'get_url-0' immediately...
09:55:55.825 | INFO    | Task run 'get_url-0' - Finished in state Completed()
09:55:55.827 | INFO    | Flow run 'great-ammonite' - PrefectHQ/prefect repository statistics 🤓:
09:55:55.827 | INFO    | Flow run 'great-ammonite' - Stars 🌠 : 12157
09:55:55.827 | INFO    | Flow run 'great-ammonite' - Forks 🍴 : 1251
09:55:55.849 | INFO    | Flow run 'great-ammonite' - Finished in state Completed('All states completed.')

此任务运行也会在用户界面中进行追踪。

任务通过任务键来唯一识别,该任务键是由任务名称、函数的完全限定名以及任何标签组成的哈希值。如果任务没有指定名称,那么名称将从装饰过的函数对象中派生出来。

任务应该有多大?

Prefect 推荐使用“小任务”。每个任务都应该代表工作流中的单一逻辑步骤。这样可以更好地控制任务失败的影响。

虽然你可以将所有代码放在一个任务中,但任何一行代码的失败都会导致整个任务失败,并且必须从头开始重试。通过将代码分割成多个相互依赖的任务,可以避免这种情况。

支持的函数#

几乎任何标准的 Python 函数都可以通过添加 @task 装饰器转变为 Prefect 任务。

Prefect 默认使用客户端的任务运行编排,这显著提高了性能,尤其是对于包含许多任务的工作流。任务创建和状态更新在本地进行,减少了执行期间对 Prefect 服务器的 API 调用。这使得能够高效处理大规模工作流,并在服务器连接不稳定时提高可靠性。

小技巧

任务总是默认在主线程中执行,除非使用特定的任务运行器来在不同的线程、进程或基础设施上执行它们。这样做便于进行Python的原生调试和性能分析。

任务更新以批处理方式记录,导致UI和API查询中的任务状态最终一致。这意味着在查看最新任务状态时可能会稍有延迟,但这样做可以显著提高性能并增加工作流的规模。

同步函数#

最简单的Prefect任务是一个同步的Python函数。这里有打印消息的同步任务示例:

from prefect import task


@task
def print_message():
    print("Hello, I'm a task")


if __name__ == "__main__":
    print_message()

异步函数#

Prefect同样支持Python的异步函数。这些生成的任务是协程,可以等待或并发执行,遵循标准的Python异步行为

from prefect import task
import asyncio


@task
async def print_message():
    await asyncio.sleep(1)
    print("Hello, I'm an async task")


asyncio.run(print_message())

类方法#

Prefect支持将同步和异步方法作为任务,包括实例方法、类方法和静态方法。对于类方法和静态方法,您必须在@task装饰器之前应用相应的方法装饰器:

from prefect import task


class MyClass:

    @task
    def my_instance_method(self):
        pass


    @classmethod
    @task
    def my_class_method(cls):
        pass


    @staticmethod
    @task
    def my_static_method():
        pass


MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()

生成器#

Prefect支持同步和异步生成器作为任务。只要生成器在产出值,该任务就被认为是“运行中”。当生成器耗尽时,任务则被视为“已完成”。生成器产出的任何值都可以被其他任务消费,并且这些任务会自动将生成器任务记录为其父任务。

from prefect import task


@task
def generator():
    for i in range(10):
        yield i


@task
def consumer(x):
    print(x)


for val in generator():
    consumer(val)

生成器函数在从任务返回时会被使用

完成的任务的结果必须是可序列化的,但生成器无法被序列化。 因此,如果你从一个任务中返回一个生成器,这个生成器将被完全消费,并且其产出的值将作为一个列表返回。 如果生成器是无限的或非常大的,这可能导致意外的行为或阻塞。

以下是主动生成器消费的例子:

from prefect import task


def gen():
    yield from [1, 2, 3]
    print('Generator consumed!')


@task
def f():
    return gen()


f()  # prints 'Generator consumed!'

如果你需要返回一个生成器而不希望消耗它,你可以使用 yield 而不是 return。 从生成器任务中产出的值不被视为最终结果,并且不会面临相同的序列化约束。

from prefect import task


def gen():
    yield from [1, 2, 3]
    print('Generator consumed!')


@task
def f():
    yield gen()


generator = next(f())
list(generator) # prints 'Generator consumed!'

并发性#

任务允许并发执行,使您能够异步执行多个任务。 这种并发性可以显著提高工作流的效率和性能。

扩展脚本以通过发出更多请求来计算每个用户的平均开放问题数量:

import httpx
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from typing import Optional


@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def get_url(url: str, params: Optional[dict[str, any]] = None):
    response = httpx.get(url, params=params)
    response.raise_for_status()
    return response.json()


def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
    issues = []
    pages = range(1, -(open_issues_count // -per_page) + 1)
    for page in pages:
        issues.append(
            get_url(
                f"https://api.github.com/repos/{repo_name}/issues",
                params={"page": page, "per_page": per_page, "state": "open"},
            )
        )
    return [i for p in issues for i in p]


@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    repo_stats = get_url(f"https://api.github.com/repos/{repo_name}")
    issues = get_open_issues(repo_name, repo_stats["open_issues_count"])
    issues_per_user = len(issues) / len(set([i["user"]["id"] for i in issues]))
    print(f"{repo_name} repository statistics 🤓:")
    print(f"Stars 🌠 : {repo_stats['stargazers_count']}")
    print(f"Forks 🍴 : {repo_stats['forks_count']}")
    print(f"Average open issues per user 💌 : {issues_per_user:.2f}")


if __name__ == "__main__":
    get_repo_info()

你现在正在获取所需的数据,但请求是顺序执行的。 任务暴露了 submit 方法,该方法可以将执行方式从顺序更改为并发。 在这个例子中,你还需要使用 result 方法来解包返回值列表:

def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
    issues = []
    pages = range(1, -(open_issues_count // -per_page) + 1)
    for page in pages:
        issues.append(
            get_url.submit(
                f"https://api.github.com/repos/{repo_name}/issues",
                params={"page": page, "per_page": per_page, "state": "open"},
            )
        )
    return [i for p in issues for i in p.result()]

日志显示,每个任务都在并行运行

12:45:28.241 | INFO    | prefect.engine - Created flow run 'intrepid-coua' for flow 'get-repo-info'
12:45:28.311 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-0' for task 'get_url'
12:45:28.312 | INFO    | Flow run 'intrepid-coua' - Executing 'get_url-0' immediately...
12:45:28.543 | INFO    | Task run 'get_url-0' - Finished in state Completed()
12:45:28.583 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-1' for task 'get_url'
12:45:28.584 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-1' for execution.
12:45:28.594 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-2' for task 'get_url'
12:45:28.594 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-2' for execution.
12:45:28.609 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-4' for task 'get_url'
12:45:28.610 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-4' for execution.
12:45:28.624 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-5' for task 'get_url'
12:45:28.625 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-5' for execution.
12:45:28.640 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-6' for task 'get_url'
12:45:28.641 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-6' for execution.
12:45:28.708 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-3' for task 'get_url'
12:45:28.708 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-3' for execution.
12:45:29.096 | INFO    | Task run 'get_url-6' - Finished in state Completed()
12:45:29.565 | INFO    | Task run 'get_url-2' - Finished in state Completed()
12:45:29.721 | INFO    | Task run 'get_url-5' - Finished in state Completed()
12:45:29.749 | INFO    | Task run 'get_url-4' - Finished in state Completed()
12:45:29.801 | INFO    | Task run 'get_url-3' - Finished in state Completed()
12:45:29.817 | INFO    | Task run 'get_url-1' - Finished in state Completed()
12:45:29.820 | INFO    | Flow run 'intrepid-coua' - PrefectHQ/prefect repository statistics 🤓:
12:45:29.820 | INFO    | Flow run 'intrepid-coua' - Stars 🌠 : 12159
12:45:29.821 | INFO    | Flow run 'intrepid-coua' - Forks 🍴 : 1251
Average open issues per user 💌 : 2.27
12:45:29.838 | INFO    | Flow run 'intrepid-coua' - Finished in state Completed('All states completed.')

任务配置#

任务允许通过可选参数进行自定义,这些参数可以提供给任务装饰器

参数

描述

name

任务的可选名称。如果未提供,则从函数名称推断名称。

description

任务的可选字符串描述。如果未提供,则从被装饰函数的文档字符串中提取描述。

tags

与该任务运行相关联的可选标签集合。这些标签与任务运行时定义的任何prefect.tags上下文中的标签组合在一起。

timeout_seconds

指示任务最大运行时间的可选秒数。如果任务超过此时间,它将被标记为失败。

cache_key_fn

一个可选的可调用对象,给定任务运行上下文和调用参数,生成一个字符串键。如果键与先前完成的某个状态匹配,则恢复该状态结果而不是再次运行任务。

cache_policy

确定用于生成缓存键的信息的可选策略。可用策略包括INPUTSTASK_SOURCERUN_IDFLOW_PARAMETERSNONE。可以使用+运算符组合。

cache_expiration

指示此任务的缓存状态可恢复的时间量的可选量;如果未提供,缓存状态将永不过期。

retries

在任务运行失败时重试的次数的可选次数。

retry_delay_seconds

在任务失败后等待重试的可选秒数。仅当retries不为零时适用。

log_prints

一个可选的布尔值,指示是否记录打印语句。

有关所有可能选项,请参阅Python SDK文档

例如,为任务提供可选的 namedescription 参数

@task(name="hello-task", description="This task says hello.")
def my_task():
    print("Hello, I'm a task")

为了区分此任务的运行,请提供 task_run_name。Python的标准字符串格式化语法适用。

import datetime
from prefect import flow, task


@task(name="My Example Task", 
      description="An example task for a tutorial.",
      task_run_name="hello-{name}-on-{date:%A}")
def my_task(name, date):
    pass


@flow
def my_flow():
    # creates a run with a name like "hello-marvin-on-Thursday"
    my_task(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))

if __name__ == "__main__":
    my_flow()

此外,此设置还接受返回字符串的函数,用于指定任务运行的名称。

import datetime
from prefect import flow, task


def generate_task_name():
    date = datetime.datetime.now(datetime.timezone.utc)
    return f"{date:%A}-is-a-lovely-day"


@task(name="My Example Task",
      description="An example task for the docs.",
      task_run_name=generate_task_name)
def my_task(name):
    pass


@flow
def my_flow():
    # creates a run with a name like "Thursday-is-a-lovely-day"
    my_task(name="marvin")


if __name__ == "__main__":
    my_flow()  

如果您需要访问有关任务的信息,请使用prefect.runtime模块。例如:

from prefect import flow
from prefect.runtime import flow_run, task_run


def generate_task_name():
    flow_name = flow_run.flow_name
    task_name = task_run.task_name

    parameters = task_run.parameters
    name = parameters["name"]
    limit = parameters["limit"]

    return f"{flow_name}-{task_name}-with-{name}-and-{limit}"


@task(name="my-example-task",
      description="An example task for a tutorial.",
      task_run_name=generate_task_name)
def my_task(name: str, limit: int = 100):
    pass


@flow
def my_flow(name: str):
    # creates a run with a name like "my-flow-my-example-task-with-marvin-and-100"
    my_task(name="marvin")

标签#

标签是可选的字符串标记,它们允许您除了通过名称或流程之外识别和分组任务。标签有助于:

您可以将标签作为关键字参数在任务装饰器上指定。

@task(name="hello-task", tags=["test"])
def my_task():
    print("Hello, I'm a task")

或者,当任务被调用时指定标签,而不是在其定义中通过tags上下文管理器

from prefect import flow, task
from prefect import tags


@task
def my_task():
    print("Hello, I'm a task")


@flow
def my_flow():
    with tags("test"):
        my_task()


if __name__ == "__main__":
    my_flow()

超时#

任务超时机制旨在防止意外的长时间运行任务。当一个任务的执行时间超过了设定的超时时长,系统会抛出超时异常并将该任务标记为失败。在用户界面上,这样的任务会被清晰地标注为TimedOut。从流程的角度来看,超时的任务与其他失败的任务一样被处理。

通过timeout_seconds关键字参数来指定超时的持续时间。

from prefect import task
import time


@task(timeout_seconds=1, log_prints=True)
def show_timeouts():
    print("I will execute")
    time.sleep(5)
    print("I will not execute")

重试机制#

Prefect能够在任务运行失败时自动重新尝试。如果一个任务的Python函数抛出异常,则该任务运行被认为失败。

要启用重试功能,请在任务中传递retriesretry_delay_seconds参数。如果任务运行失败,Prefect将最多重试retries次,每次尝试之间等待retry_delay_seconds秒。如果在最后一次重试中任务仍然失败,Prefect会将该任务标记为失败。

当任务被重试时,不会创建一个新的任务运行。相反,会在原始任务运行的状态历史中添加一个新的状态。

重试机制通常在使用外部系统的情境下非常有用,例如发起API请求。下面的例子使用httpx库来进行HTTP请求。

import httpx
from prefect import flow, task


@task(retries=2, retry_delay_seconds=5)
def get_data_task(
    url: str = "https://api.brittle-service.com/endpoint"
) -> dict:
    response = httpx.get(url)
    
    # If the response status code is anything but a 2xx, httpx will raise
    # an exception. This task doesn't handle the exception, so Prefect will
    # catch the exception and will consider the task run failed.
    response.raise_for_status()
    
    return response.json()
    

@flow
def get_data_flow():
    get_data_task()


if __name__ == "__main__":
    get_data_flow()

在这个任务中,如果对脆弱API的HTTP请求接收到任何非2xx(如200、201等)的状态码,Prefect将会最多重试两次该任务,每次重试之间等待五秒钟。

自定义重试行为#

retry_delay_seconds选项接受一个整数列表以实现自定义的重试行为。下面的任务将在下次尝试开始前分别等待逐渐增加的间隔时间,即1秒、10秒和100秒:

from prefect import task


@task(retries=3, retry_delay_seconds=[1, 10, 100])
def some_task_with_manual_backoff_retries():
   (rest of code follows)

retry_condition_fn 参数接受返回布尔值的可调用对象。 如果该可调用对象返回True,则任务将被重试。 如果返回False,则任务不会重试。 该可调用对象接受三个参数:任务本身、任务运行情况以及任务运行的状态。 以下任务将在HTTP状态码不是401或404时重试:

import httpx
from prefect import flow, task


def retry_handler(task, task_run, state) -> bool:
    """Custom retry handler that specifies when to retry a task"""
    try:
        # Attempt to get the result of the task
        state.result()
    except httpx.HTTPStatusError as exc:
        # Retry on any HTTP status code that is not 401 or 404
        do_not_retry_on_these_codes = [401, 404]
        return exc.response.status_code not in do_not_retry_on_these_codes
    except httpx.ConnectError:
        # Do not retry
        return False
    except:
        # For any other exception, retry
        return True


@task(retries=1, retry_condition_fn=retry_handler)
def my_api_call_task(url):
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()


@flow
def get_data_flow(url):
    my_api_call_task(url=url)


if __name__ == "__main__":
    get_data_flow(url="https://httpbin.org/status/503")

另外,你可以传入可调用对象,该对象接受重试次数作为参数并返回列表。 Prefect 包含了 exponential_backoff 工具,它将自动生成与指数退避重试策略相对应的重试延迟列表。 以下流程将在每次重试前等待10秒、20秒,然后是40秒。

from prefect import task
from prefect.tasks import exponential_backoff


@task(retries=3, retry_delay_seconds=exponential_backoff(backoff_factor=10))
def some_task_with_exponential_backoff_retries():
   (rest of code follows)

增加“抖动”以避免雷鸣般的群体效应#

您可以在重试延迟时间中添加“抖动”(jitter)。 抖动是在重试周期中加入的随机时间量,这有助于防止“雷鸣般群体效应”的发生,即当许多任务同时重试时,可能会压垮系统。

可以使用retry_jitter_factor选项来为基本延迟增加变化量。 例如,一个具有0.5 retry_jitter_factor的10秒重试延迟将允许最多15秒的延迟。 较大的retry_jitter_factor值可以提供更多的防护措施,以对抗“雷鸣般群体效应”,同时保持平均重试延迟时间恒定。 例如,以下任务在其指数退避中添加了抖动,使得重试延迟将在最大延迟时间20秒、40秒和80秒之间变化。

from prefect import task
from prefect.tasks import exponential_backoff


@task(
    retries=3,
    retry_delay_seconds=exponential_backoff(backoff_factor=10),
    retry_jitter_factor=1,
)


def some_task_with_exponential_backoff_retries():
   (rest of code follows)

全局配置重试行为#

通过设置来全局设定默认的重试次数和重试延迟。这些全局设置不会覆盖任务装饰器中设定的 retriesretry_delay_seconds

prefect config set PREFECT_TASK_DEFAULT_RETRIES=2
prefect config set PREFECT_TASK_DEFAULT_RETRY_DELAY_SECONDS = [1, 10, 100]