编写并运行流#

学习定义和运行流的基础知识。

from pathlib import Path

temp_dir = Path(".temp")
# 创建临时目录
if not temp_dir.exists():
    temp_dir.mkdir(exist_ok=True)

流(flow)是工作流(workflow)逻辑的容器。

流被定义为 Python 函数。它们可以接受输入,执行任务,并返回结果。

通过添加 @flow 装饰器,将任何 Python 函数变成 Prefect 流程:

from prefect import flow


@flow
def my_flow() -> str:
    return "Hello, world!"

if __name__ == "__main__":
    print(my_flow())

当函数转变为流程时,它获得了以下能力:

  • 自动跟踪有关流运行的元数据,例如运行时间和最终状态。

  • 记录流进入的每个状态。这使得您可以观察并针对流执行中的每次转换采取行动。

  • 输入参数可以作为工作流参数进行类型验证。

  • 在失败时可以进行重试,并可配置延迟和重试限制。

  • 可以实施超时机制以防止意外的长时间运行的工作流。

  • 流可以被部署,从而暴露出用于远程交互的 API。

流程通过名称来唯一标识。可以为该流程提供 name 参数值:

from prefect import flow

@flow(name="My Flow")
def my_flow() -> str:
    return "Hello, world!"

如果没有提供 name,Prefect 将使用流函数的名称。

运行流#

流运行是指流的一次执行。

通过调用其函数名称来创建流运行,就像调用普通的 Python 函数一样。

您还可以通过以下方式创建流运行:

  • 使用外部调度器(如 cron)来调用流函数

  • 在 Prefect Cloud 或自托管的 Prefect 服务器中触发该流程的部署

  • 通过调度、Prefect 用户界面或 Prefect API 启动部署的流运行

无论您如何运行您的流,Prefect 都会监控流运行,捕捉其状态以实现可观测性。您可以记录关于流运行的各种元数据,用于监控、故障排除和审计目的。

以下示例使用 HTTPX 客户端库来获取有关 Prefect 存储库 main 分支 的统计数据。

%%file {temp_dir}/repo_info.py
import httpx
from prefect import flow


@flow
def get_repo_info():
    url = "https://api.github.com/repos/PrefectHQ/prefect"
    response = httpx.get(url)
    response.raise_for_status()
    repo = response.json()
    print("PrefectHQ/prefect repository statistics 🤓:")
    print(f"Stars 🌠 : {repo['stargazers_count']}")
    print(f"Forks 🍴 : {repo['forks_count']}")


if __name__ == "__main__":
    get_repo_info()
Writing .temp/repo_info.py

执行此脚本后,将得到以下输出:

12:47:42.792 | INFO | prefect.engine - Created flow run 'ludicrous-warthog' for flow 'get-repo-info'
PrefectHQ/prefect repository statistics 🤓:
Stars 🌠 : 12146
Forks 🍴 : 1245
12:47:45.008 | INFO | Flow run 'ludicrous-warthog' - Finished in state Completed()

指定流参数#

正如任何 Python 函数一样,你可以向流传递参数,包括位置参数和关键字参数。这些在流函数上定义的参数被称为参数。它们由 Prefect 编排引擎存储在流运行对象上。

Prefect 利用提供的任何类型提示自动对输入进行类型转换。类型提示提供了一种简单的方式来在你的流参数上强制执行类型检查,并且可以使用 Pydantic 进行自定义。Prefect 支持任何 Pydantic 模型作为流参数的类型提示。

例如,要自动将参数转换为 datetime 对象:

%%file {temp_dir}/what_day_is_it.py
from datetime import (
    datetime,
    timezone,
)
from prefect import flow


@flow
def what_day_is_it(date: datetime | None = None):
    if date is None:
        date = datetime.now(timezone.utc)
    print(f"It was {date.strftime('%A')} on {date.isoformat()}")


if __name__ == "__main__":
    what_day_is_it("2021-01-01T02:00:19.180906")
Writing .temp/what_day_is_it.py

当你运行这个流时,你将看到以下输出:

It was Friday on 2021-01-01T02:00:19.180906

您也可以传递 BaseModel 参数的 dict 表示,并且它将被强制转换。

%%file {temp_dir}/dict_custom.py
from prefect import flow
from pydantic import BaseModel


class Model(BaseModel):
    a: int
    b: str


@flow
def flow_that_validates_parameters(model: Model): ...

if __name__ == "__main__":
    flow_that_validates_parameters(
        model={"a": "WRONG", "b": "fine"}
    )
Writing .temp/dict_custom.py

这个流运行将因以下错误而失败:

Flow run received invalid parameters:
 - model.a: Input should be a valid integer, unable to parse string as an integer

请注意,您可以通过部署使用 API 为流提供参数值。发送到 API 的流运行参数在可能的情况下会被强制转换为适当的类型。

Prefect API 需要关键字参数

使用 Prefect API 创建流程运行时,您在覆盖默认参数时必须指定参数名。传递的值不能是位置参数。

在执行流之前,会先对参数进行验证。如果部署的流程运行接收到无效参数,它将从待定状态直接转为失败状态,而不会进入运行状态。

备注

流量运行参数的大小不得超过512kb。

编排工作流#

在 Prefect 工作流中,流可以调用任务,这是编排工作中最细微的单元。

from prefect import flow, task


@task
def print_hello(name):
    print(f"Hello {name}!")


@flow(name="Hello Flow")
def hello_world(name="world"):
    print_hello(name)

单个流函数可以包含你所有工作流的代码。然而,如果你将所有的工作流逻辑都放在一个流函数中,并且任何一行代码出现故障,整个流程就会失败,并且必须从头开始重新尝试。你的工作流越细致划分,它们从失败中恢复的能力就越强,你也就能更容易地找到并修复问题。

Perfect 任务非常适合使用分布式计算框架(如 Dask 或 Ray)进行并行或分布式执行。

嵌套流#

除了在流中调用任务外,流还可以调用其他流。当一个流函数被另一个流程调用时,就会创建嵌套的流运行。当一个流调用另一个流时,调用流的运行被称为“父”运行,而被调用的流运行则被称为“子”运行。

在用户界面中,每个子流运行都与其父流链接在一起,并且可以单独观察。

对于大多数目的而言,嵌套的流运行表现得就像非嵌套的流运行一样。在后端中,嵌套的流运行有完整的表示,就好像它是单独被调用的一样。嵌套的流运行与普通的流运行不同之处在于,它们会将任何传递的任务未来结果解析为数据。这使得数据可以从父流程运行轻松地传递给嵌套的流运行。

当嵌套流开始运行时,它会为其包含的任何任务创建新的任务执行器。当嵌套流完成时,这个任务执行器就会关闭。嵌套流会阻塞父级流的执行,直到其完成。然而,异步嵌套流可以与 AnyIO 任务组asyncio.gather() 并发运行。

嵌套运行之间的关系是通过在父级流中的特殊任务运行来记录的,它代表了子流运行。代表子流运行的任务运行的 state_details 字段包括 child_flow_run_id。嵌套流运行的 state_details 字段包括 parent_task_run_id

你可以在同一个文件中定义多个流。无论是本地运行还是通过部署运行,你都必须指明哪个流是流运行的入口点。

警告

在不取消其父流运行的情况下,无法取消嵌套的流运行。如果您需要能够独立于其父流运行来取消嵌套的流运行,建议将其单独部署,并使用 run_deployment 方法启动它。

还可以在单独的模块中定义流或任务,并将它们导入以供使用

from prefect import flow, task


@flow(name="Nestedflow")
def my_nested_flow(msg):
    print(f"Nestedflow says: {msg}")

这里有父流程,它导入并使用了 my_nested_flow 作为嵌套的子流程。

%%file {temp_dir}/hello.py

from prefect import flow, task
from myproject.flows import my_nested_flow


@task(name="Print Hello")
def print_hello(name):
    msg = f"Hello {name}!"
    print(msg)
    return msg


@flow(name="Hello Flow")
def hello_world(name="world"):
    message = print_hello(name)
    my_nested_flow(message)


if __name__=="__main__":
    hello_world("Marvin")
Writing .temp/hello.py

运行 hello_world() 流会创建如下的流执行:

08:24:06.617 | INFO    | prefect.engine - Created flow run 'sage-mongoose' for flow 'Hello Flow'
08:24:06.620 | INFO    | prefect.engine - View at https://app.prefect.cloud/...
08:24:07.113 | INFO    | Task run 'Print Hello-0' - Created task run 'Print Hello-0' for task 'Print Hello'
Hello Marvin!
08:24:07.445 | INFO    | Task run 'Print Hello-0' - Finished in state Completed()
08:24:07.825 | INFO    | Flow run 'sage-mongoose' - Created subflow run 'powerful-capybara' for flow 'Nestedflow'
08:24:07.826 | INFO    | prefect.engine - View at https://app.prefect.cloud/...
Nestedflow says: Hello Marvin!
08:24:08.165 | INFO    | Flow run 'powerful-capybara' - Finished in state Completed()
08:24:08.296 | INFO    | Flow run 'sage-mongoose' - Finished in state Completed()

以下情况,你可能需要定义嵌套流而不是单独调用任务:

  • 可观测性:嵌套流,如同任何其他流运行一样,在 Prefect UI 和 Prefect Cloud 中具有一流的可观测性。你会在运行仪表板中看到嵌套流的状态,而无需深入特定流运行中的任务。请参阅最终状态确定,以了解如何在流中使用任务状态的示例。

  • 条件流程:如果你有一组仅在特定条件下运行的任务,你可以将它们分组在嵌套流程中,并根据条件运行嵌套流程,而不是单独运行每个任务。

  • 参数化:流程对参数化提供了一流的支持,通过简单地向运行它们的嵌套流程传递不同的参数,可以轻松地在不同的使用案例中运行相同的任务组。

  • 任务运行器:嵌套流程允许你指定用于流程中任务的任务运行器。例如,为了优化 Dask 的某些任务的并行执行,将它们分组在使用 Dask 任务运行器的嵌套流程中。你可以为每个嵌套流程使用不同的任务运行器。

支持的功能#

通过添加 @flow 装饰器,几乎所有标准 Python 函数都可以转变为 Prefect 流。默认情况下,流在主线程中执行,以便于进行原生 Python 调试和性能分析。

如上例所示,流默认以同步方式运行。

异步函数#

Prefect 也支持异步执行。生成的流程是协程,可以按照 async Python 的标准规则进行等待或并发运行。例如:

%%file {temp_dir}/async_flow.py
import asyncio
from prefect import task, flow


@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1)
        print(value, end=" ")


@flow
async def async_flow():
    print("Hello, I'm an async flow")

    # runs immediately
    await print_values([1, 2])

    # runs concurrently
    coros = [print_values("abcd"), print_values("6789")]
    await asyncio.gather(*coros)


if __name__ == "__main__":
    asyncio.run(async_flow())
Writing .temp/async_flow.py

类方法#

Prefect 支持同步和异步的类方法,包括实例方法、类方法和静态方法。对于类方法和静态方法,请在 @flow 装饰器上方应用适当的方法装饰器:

%%file {temp_dir}/class_meth.py
from prefect import flow


class MyClass:

    @flow
    def my_instance_method(self):
        pass


    @classmethod
    @flow
    def my_class_method(cls):
        pass


    @staticmethod
    @flow
    def my_static_method():
        pass


MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()
Writing .temp/class_meth.py

生成器#

Prefect 支持将同步和异步生成器作为流。只要生成器正在产出值,该流就被视为正在运行中。当生成器耗尽时,流则被认为已完成。由生成器产生的任何值都可以被其他流或任务所消费。

%%file {temp_dir}/gen.py
from prefect import flow


@flow
def generator():
    for i in range(10):
        yield i

@flow
def consumer(x):
    print(x)


for val in generator():
    consumer(val)
Writing .temp/gen.py

生成器在流返回时被消耗

完成流的结果必须是可序列化的,但生成器无法被序列化。因此,如果你从流中返回生成器,该生成器将被完全消耗,并且其产生的值将作为列表返回。如果生成器是无限的或非常大的,这可能导致意外行为或阻塞。

以下是主动(proactive)生成器消耗的例子:

from prefect import flow


def gen():
    yield from [1, 2, 3]
    print('Generator consumed!')


@flow
def f():
    return gen()


f()  # prints 'Generator consumed!'

如果你需要在不消耗生成器的情况下将其返回,可以使用 yield 代替 return。从生成器流中产生的值不被视为最终结果,因此不会面临相同的序列化限制:

from prefect import flow


def gen():
    yield from [1, 2, 3]
    print('Generator consumed!')


@flow
def f():
    yield gen


generator = next(f())
list(generator()) # prints 'Generator consumed!'

流程运行#

一次流运行是指执行一次流。

你可以通过手动调用流函数,或者使用外部调度器(如 cron)来触发流函数,从而创建一次流运行。大多数用户通过在 Prefect Cloud 或 Prefect 服务器上创建部署,然后通过调度、Prefect UI 或 Prefect API 为该部署调度一次流运行。

无论你如何运行流,Prefect API 都会监控这次流运行,并记录用于监控、故障排查和审计的信息。

流设置#

所有流都可以通过向装饰器传递参数来进行配置。流接受以下可选设置:

参数

描述

description

流程的可选字符串描述。如果未提供,则从装饰函数的文档字符串中提取描述。

name

流程的可选名称。如果未提供,则从函数中推断名称。

retries

在流程运行失败时重试的次数(可选)。

retry_delay_seconds

在失败后等待重新尝试流程的时间(以秒为单位)(仅当 retries 不为零时适用)。

flow_run_name

用于区分此流程运行的可选名称;该名称可以作为包含流程参数变量的字符串模板提供;也可以提供一个返回字符串的函数来提供此名称。

task_runner

用于在流程内执行任务时使用的任务运行器(可选)。如果在提交任务时未提供,并且使用了 .submit() 方法,则使用 ThreadPoolTaskRunner

timeout_seconds

指示流程最大运行时长的可选秒数。如果流程超过此时间限制,则标记为失败。流程执行可能会继续直到调用下一个任务。

validate_parameters

布尔值,指示是否通过 Pydantic 验证传递给流程的参数。默认值为 True

version

流程的可选版本字符串。如果未提供,我们将尝试创建包含包装函数的文件的哈希值作为版本字符串。如果无法找到文件,版本将为 null。

例如,您可以提供 namedescription 参数

from prefect import flow


@flow(
    name="My Flow", description="My flow with a name and description", log_prints=True)
def my_flow():
    print("Hello, I'm a flow")


if __name__ == "__main__":
    my_flow()

如果未提供 description,则使用流函数的文档字符串作为描述。

您可以通过传递 flow_run_name 来区分不同的流运行。 该参数接受字符串,可以包含对流参数的模板引用。 名称使用 Python 的标准字符串格式化语法进行格式化:

import datetime
from prefect import flow


@flow(flow_run_name="{name}-on-{date:%A}")
def my_flow(name: str, date: datetime.datetime):
    pass


# creates a flow run called 'marvin-on-Thursday'
if __name__ == "__main__":
    my_flow(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))

这个设置同样接受函数,该函数返回字符串作为流运行的名称

import datetime
from prefect import flow


def generate_flow_run_name():
    date = datetime.datetime.now(datetime.timezone.utc)
    return f"{date:%A}-is-a-nice-day"


@flow(flow_run_name=generate_flow_run_name)
def my_flow(name: str):
    pass


# creates a flow run named 'Thursday-is-a-nice-day'
if __name__ == "__main__":
    my_flow(name="marvin")

如果您需要获取关于流的信息,请使用 prefect.runtime 模块。例如:

from prefect import flow
from prefect.runtime import flow_run


def generate_flow_run_name():
    flow_name = flow_run.flow_name

    parameters = flow_run.parameters
    name = parameters["name"]
    limit = parameters["limit"]

    return f"{flow_name}-with-{name}-and-{limit}"


@flow(flow_run_name=generate_flow_run_name)
def my_flow(name: str, limit: int = 100):
    pass


# creates a flow run named 'my-flow-with-marvin-and-100'
if __name__ == "__main__":
    my_flow(name="marvin")

请注意,validate_parameters 函数会检查输入值是否符合函数上标注的类型。 在可能的情况下,值会被强制转换为正确的类型。 例如,如果一个参数被定义为 x: int 并且传递了字符串 “5”,它将被解析为 5。 如果设置为 False,则不会对流程参数执行验证。

最终状态的确定#

状态是记录特定任务运行或流运行状况的档案。

流程的最终状态由其返回值决定。以下规则适用:

  • 如果在流函数中直接抛出异常,则该流运行将被标记为失败(FAILED)。

  • 如果流返回手动创建的状态,它将被用作最终流运行的状态。这允许手动确定最终状态。

  • 如果流返回可迭代的状态集合,其中任何失败(FAILED)状态的存在都会导致运行被标记为失败。

在没有错误的情况下,任何流程的返回都将被标记为“已完成”。

警告

如果您通过编程方式操控状态,您可以创建一些情况,在这些情况下流中的任务可能会失败,但不会引发整个流运行的失败。例如:

from prefect import flow, task 


@task 
def add_one(x):
    return x + 1


@flow 
def my_flow():
    # avoided raising an exception via `return_state=True`
    state = add_one("1", return_state=True)
    assert state.is_failed()

    # the flow function returns successfully!
    return

如果从流程函数中返回了state,则运行将被标记为 FAILED

返回未来值#

如果流返回一个或多个 future,那么最终状态将根据底层状态来确定。

from prefect import flow, task


@task
def always_fails_task():
    raise ValueError("I fail successfully")


@task
def always_succeeds_task():
    print("I'm fail safe!")
    return "success"


@flow
def always_succeeds_flow():
    x = always_fails_task.submit().result(raise_on_failure=False)
    y = always_succeeds_task.submit(wait_for=[x])
    return y


if __name__ == "__main__":
    always_succeeds_flow()

这个流运行以“完成”的最终状态结束,因为流返回了成功任务的未来值:

18:35:24.965 | INFO    | prefect.engine - Created flow run 'whispering-guan' for flow 'always-succeeds-flow'
18:35:25.204 | INFO    | Flow run 'whispering-guan' - Created task run 'always_fails_task-96e4be14-0' for task 'always_fails_task'
18:35:25.205 | INFO    | Flow run 'whispering-guan' - Submitted task run 'always_fails_task-96e4be14-0' for execution.
18:35:25.232 | ERROR   | Task run 'always_fails_task-96e4be14-0' - Encountered exception during execution:
Traceback (most recent call last):
  ...
ValueError: I fail successfully
18:35:25.265 | ERROR   | Task run 'always_fails_task-96e4be14-0' - Finished in state Failed('Task run encountered an exception.')
18:35:25.289 | INFO    | Flow run 'whispering-guan' - Created task run 'always_succeeds_task-9c27db32-0' for task 'always_succeeds_task'
18:35:25.289 | INFO    | Flow run 'whispering-guan' - Submitted task run 'always_succeeds_task-9c27db32-0' for execution.
I'm fail safe!
18:35:25.335 | INFO    | Task run 'always_succeeds_task-9c27db32-0' - Finished in state Completed()
18:35:25.362 | INFO    | Flow run 'whispering-guan' - Finished in state Completed('All states completed.')

返回多个状态或者未来#

如果一个流程返回了多个状态或未来状态的混合,最终的状态将通过解析所有未来状态为确定状态,然后判断这些状态中是否有未 COMPLETED 的状态来确定。

from prefect import task, flow


@task
def always_fails_task():
    raise ValueError("I am bad task")


@task
def always_succeeds_task():
    return "foo"


@flow
def always_succeeds_flow():
    return "bar"


@flow
def always_fails_flow():
    x = always_fails_task()
    y = always_succeeds_task()
    z = always_succeeds_flow()
    return x, y, z

执行 always_fails_flow 失败,原因在于返回的三个 future 中有一个失败了。请注意,每个返回的 future 的状态都包含在流程运行输出中:

...
20:57:52.438 | INFO    | Flow run 'unbiased-firefly' - Finished in state Completed()
20:57:52.811 | ERROR   | Flow run 'impartial-gorilla' - Finished in state Failed('1/3 states failed.')
Failed(message='1/3 states failed.', type=FAILED, result=(Failed(message='Task run encountered an exception.', type=FAILED, result=ValueError('I am bad task'), task_run_id=5fd4c697-7c4c-440d-8ebc-dd9c5bbf2245), Completed(message=None, type=COMPLETED, result='foo', task_run_id=df9b6256-f8ac-457c-ba69-0638ac9b9367), Completed(message=None, type=COMPLETED, result='bar', task_run_id=cfdbf4f1-dccd-4816-8d0f-128750017d0c)), flow_run_id=6d2ec094-001a-4cb0-a24e-d2051db6318d)

如果返回多个状态,它们必须被包含在 setlisttuple 中。

运行手动状态#

如果流程返回手动创建的状态,那么最终状态将根据返回值来确定。

from prefect import task, flow
from prefect.states import Completed, Failed


@task
def always_fails_task():
    raise ValueError("I fail successfully")


@task
def always_succeeds_task():
    print("I'm fail safe!")
    return "success"


@flow
def always_succeeds_flow():
    x = always_fails_task.submit()
    y = always_succeeds_task.submit()
    if y.result() == "success":
        return Completed(message="I am happy with this result")
    else:
        return Failed(message="How did this happen!?")


if __name__ == "__main__":
    always_succeeds_flow()

执行该流将得到以下结果:

...
ValueError: I fail successfully
07:29:34.754 | INFO    | Task run 'always_succeeds_task-0' - Created task run 'always_succeeds_task-0' for task 'always_succeeds_task'
07:29:34.848 | ERROR   | Task run 'always_fails_task-0' - Finished in state Failed('Task run encountered an exception ValueError: I fail successfully')
I'm fail safe!
07:29:35.086 | INFO    | Task run 'always_succeeds_task-0' - Finished in state Completed()
07:29:35.225 | INFO    | Flow run 'hidden-butterfly' - Finished in state Completed('I am happy with this result')

如果流运行返回任何其他对象,则该运行被记录为 COMPLETED

自定义命名状态#

您还可以创建自定义命名状态,以在流运行状态中提供更细致的分类。

例如,可以创建 Skipped 状态,以指示某个流程运行已被跳过。

from prefect import flow
from prefect.states import Completed

@flow
def my_flow(work_to_do: bool):
    if not work_to_do:
        return Completed(message="No work to do 💤", name="Skipped")
    else:
        return Completed(message="Work was done 💪")


if __name__ == "__main__":
    my_flow(work_to_do=False)

运行结果:

15:26:49.644 | INFO    | Flow run 'liberal-zebra' - Finished in state Skipped('No work to do', type=COMPLETED)

重试#

在工作流中可能会遇到意外错误。例如,GitHub API 可能暂时不可用或受到速率限制。

Prefect 可以在失败时自动重试流程运行。

要启用重试功能,请将整数传递给流程的 retries 参数。如果流程运行失败,Prefect 将尝试最多重试 retries 次。

如果流运行在最后一次重试时失败,Prefect 会将最终的流程运行状态记录为失败。

可选地,可以通过向 retry_delay_seconds 传递整数来指定每次重试尝试之间等待的秒数。

查看事务处理部分,可以使您的流程更加健壮,并在需要时回滚操作。