编写和运行任务#
学习编写任务的基础知识。
在 Prefect 工作流中,完美的任务是独立的工作单元。你可以通过给任何 Python 函数添加 @task
装饰器来将其转变为任务。任务可以:
接受输入,执行工作,并返回输出
跨调用缓存它们的执行结果
将工作流逻辑封装成可重用单元,跨越不同流程
在运行前接收关于上游任务依赖及其状态的元数据
使用自动日志记录捕获运行时详情、标签和最终状态
并发执行
在同一文件中定义或从模块中导入
可以从工作流或其他任务中调用
工作流和任务共享一些通用特性:
这里有包含单一任务的简单流示例:
import httpx
from prefect import flow, task
from typing import Optional
@task
def get_url(url: str, params: Optional[dict[str, any]] = None):
response = httpx.get(url, params=params)
response.raise_for_status()
return response.json()
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
url = f"https://api.github.com/repos/{repo_name}"
repo_stats = get_url(url)
print(f"{repo_name} repository statistics 🤓:")
print(f"Stars 🌠 : {repo_stats['stargazers_count']}")
print(f"Forks 🍴 : {repo_stats['forks_count']}")
if __name__ == "__main__":
get_repo_info()
运行结果:
09:55:55.412 | INFO | prefect.engine - Created flow run 'great-ammonite' for flow 'get-repo-info'
09:55:55.499 | INFO | Flow run 'great-ammonite' - Created task run 'get_url-0' for task 'get_url'
09:55:55.500 | INFO | Flow run 'great-ammonite' - Executing 'get_url-0' immediately...
09:55:55.825 | INFO | Task run 'get_url-0' - Finished in state Completed()
09:55:55.827 | INFO | Flow run 'great-ammonite' - PrefectHQ/prefect repository statistics 🤓:
09:55:55.827 | INFO | Flow run 'great-ammonite' - Stars 🌠 : 12157
09:55:55.827 | INFO | Flow run 'great-ammonite' - Forks 🍴 : 1251
09:55:55.849 | INFO | Flow run 'great-ammonite' - Finished in state Completed('All states completed.')
此任务运行也会在用户界面中进行追踪。
任务通过任务键来唯一识别,该任务键是由任务名称、函数的完全限定名以及任何标签组成的哈希值。如果任务没有指定名称,那么名称将从装饰过的函数对象中派生出来。
任务应该有多大?
Prefect 推荐使用“小任务”。每个任务都应该代表工作流中的单一逻辑步骤。这样可以更好地控制任务失败的影响。
虽然你可以将所有代码放在一个任务中,但任何一行代码的失败都会导致整个任务失败,并且必须从头开始重试。通过将代码分割成多个相互依赖的任务,可以避免这种情况。
支持的函数#
几乎任何标准的 Python 函数都可以通过添加 @task
装饰器转变为 Prefect 任务。
Prefect 默认使用客户端的任务运行编排,这显著提高了性能,尤其是对于包含许多任务的工作流。任务创建和状态更新在本地进行,减少了执行期间对 Prefect 服务器的 API 调用。这使得能够高效处理大规模工作流,并在服务器连接不稳定时提高可靠性。
小技巧
任务总是默认在主线程中执行,除非使用特定的任务运行器来在不同的线程、进程或基础设施上执行它们。这样做便于进行Python的原生调试和性能分析。
任务更新以批处理方式记录,导致UI和API查询中的任务状态最终一致。这意味着在查看最新任务状态时可能会稍有延迟,但这样做可以显著提高性能并增加工作流的规模。
同步函数#
最简单的Prefect任务是一个同步的Python函数。这里有打印消息的同步任务示例:
from prefect import task
@task
def print_message():
print("Hello, I'm a task")
if __name__ == "__main__":
print_message()
异步函数#
Prefect同样支持Python的异步函数。这些生成的任务是协程,可以等待或并发执行,遵循标准的Python异步行为。
from prefect import task
import asyncio
@task
async def print_message():
await asyncio.sleep(1)
print("Hello, I'm an async task")
asyncio.run(print_message())
类方法#
Prefect支持将同步和异步方法作为任务,包括实例方法、类方法和静态方法。对于类方法和静态方法,您必须在@task
装饰器之前应用相应的方法装饰器:
from prefect import task
class MyClass:
@task
def my_instance_method(self):
pass
@classmethod
@task
def my_class_method(cls):
pass
@staticmethod
@task
def my_static_method():
pass
MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()
生成器#
Prefect支持同步和异步生成器作为任务。只要生成器在产出值,该任务就被认为是“运行中”。当生成器耗尽时,任务则被视为“已完成”。生成器产出的任何值都可以被其他任务消费,并且这些任务会自动将生成器任务记录为其父任务。
from prefect import task
@task
def generator():
for i in range(10):
yield i
@task
def consumer(x):
print(x)
for val in generator():
consumer(val)
生成器函数在从任务返回时会被使用
完成的任务的结果必须是可序列化的,但生成器无法被序列化。 因此,如果你从一个任务中返回一个生成器,这个生成器将被完全消费,并且其产出的值将作为一个列表返回。 如果生成器是无限的或非常大的,这可能导致意外的行为或阻塞。
以下是主动生成器消费的例子:
from prefect import task
def gen():
yield from [1, 2, 3]
print('Generator consumed!')
@task
def f():
return gen()
f() # prints 'Generator consumed!'
如果你需要返回一个生成器而不希望消耗它,你可以使用 yield
而不是 return
。
从生成器任务中产出的值不被视为最终结果,并且不会面临相同的序列化约束。
from prefect import task
def gen():
yield from [1, 2, 3]
print('Generator consumed!')
@task
def f():
yield gen()
generator = next(f())
list(generator) # prints 'Generator consumed!'
并发性#
任务允许并发执行,使您能够异步执行多个任务。 这种并发性可以显著提高工作流的效率和性能。
扩展脚本以通过发出更多请求来计算每个用户的平均开放问题数量:
import httpx
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from typing import Optional
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def get_url(url: str, params: Optional[dict[str, any]] = None):
response = httpx.get(url, params=params)
response.raise_for_status()
return response.json()
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
issues = []
pages = range(1, -(open_issues_count // -per_page) + 1)
for page in pages:
issues.append(
get_url(
f"https://api.github.com/repos/{repo_name}/issues",
params={"page": page, "per_page": per_page, "state": "open"},
)
)
return [i for p in issues for i in p]
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
repo_stats = get_url(f"https://api.github.com/repos/{repo_name}")
issues = get_open_issues(repo_name, repo_stats["open_issues_count"])
issues_per_user = len(issues) / len(set([i["user"]["id"] for i in issues]))
print(f"{repo_name} repository statistics 🤓:")
print(f"Stars 🌠 : {repo_stats['stargazers_count']}")
print(f"Forks 🍴 : {repo_stats['forks_count']}")
print(f"Average open issues per user 💌 : {issues_per_user:.2f}")
if __name__ == "__main__":
get_repo_info()
你现在正在获取所需的数据,但请求是顺序执行的。
任务暴露了 submit
方法,该方法可以将执行方式从顺序更改为并发。
在这个例子中,你还需要使用 result
方法来解包返回值列表:
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
issues = []
pages = range(1, -(open_issues_count // -per_page) + 1)
for page in pages:
issues.append(
get_url.submit(
f"https://api.github.com/repos/{repo_name}/issues",
params={"page": page, "per_page": per_page, "state": "open"},
)
)
return [i for p in issues for i in p.result()]
日志显示,每个任务都在并行运行
12:45:28.241 | INFO | prefect.engine - Created flow run 'intrepid-coua' for flow 'get-repo-info'
12:45:28.311 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-0' for task 'get_url'
12:45:28.312 | INFO | Flow run 'intrepid-coua' - Executing 'get_url-0' immediately...
12:45:28.543 | INFO | Task run 'get_url-0' - Finished in state Completed()
12:45:28.583 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-1' for task 'get_url'
12:45:28.584 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-1' for execution.
12:45:28.594 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-2' for task 'get_url'
12:45:28.594 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-2' for execution.
12:45:28.609 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-4' for task 'get_url'
12:45:28.610 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-4' for execution.
12:45:28.624 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-5' for task 'get_url'
12:45:28.625 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-5' for execution.
12:45:28.640 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-6' for task 'get_url'
12:45:28.641 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-6' for execution.
12:45:28.708 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-3' for task 'get_url'
12:45:28.708 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-3' for execution.
12:45:29.096 | INFO | Task run 'get_url-6' - Finished in state Completed()
12:45:29.565 | INFO | Task run 'get_url-2' - Finished in state Completed()
12:45:29.721 | INFO | Task run 'get_url-5' - Finished in state Completed()
12:45:29.749 | INFO | Task run 'get_url-4' - Finished in state Completed()
12:45:29.801 | INFO | Task run 'get_url-3' - Finished in state Completed()
12:45:29.817 | INFO | Task run 'get_url-1' - Finished in state Completed()
12:45:29.820 | INFO | Flow run 'intrepid-coua' - PrefectHQ/prefect repository statistics 🤓:
12:45:29.820 | INFO | Flow run 'intrepid-coua' - Stars 🌠 : 12159
12:45:29.821 | INFO | Flow run 'intrepid-coua' - Forks 🍴 : 1251
Average open issues per user 💌 : 2.27
12:45:29.838 | INFO | Flow run 'intrepid-coua' - Finished in state Completed('All states completed.')
任务配置#
任务允许通过可选参数进行自定义,这些参数可以提供给任务装饰器。
参数 |
描述 |
---|---|
|
任务的可选名称。如果未提供,则从函数名称推断名称。 |
|
任务的可选字符串描述。如果未提供,则从被装饰函数的文档字符串中提取描述。 |
|
与该任务运行相关联的可选标签集合。这些标签与任务运行时定义的任何 |
|
指示任务最大运行时间的可选秒数。如果任务超过此时间,它将被标记为失败。 |
|
一个可选的可调用对象,给定任务运行上下文和调用参数,生成一个字符串键。如果键与先前完成的某个状态匹配,则恢复该状态结果而不是再次运行任务。 |
|
确定用于生成缓存键的信息的可选策略。可用策略包括 |
|
指示此任务的缓存状态可恢复的时间量的可选量;如果未提供,缓存状态将永不过期。 |
|
在任务运行失败时重试的次数的可选次数。 |
|
在任务失败后等待重试的可选秒数。仅当 |
|
一个可选的布尔值,指示是否记录打印语句。 |
有关所有可能选项,请参阅Python SDK文档。
例如,为任务提供可选的 name
和 description
参数
@task(name="hello-task", description="This task says hello.")
def my_task():
print("Hello, I'm a task")
为了区分此任务的运行,请提供 task_run_name
。Python的标准字符串格式化语法适用。
import datetime
from prefect import flow, task
@task(name="My Example Task",
description="An example task for a tutorial.",
task_run_name="hello-{name}-on-{date:%A}")
def my_task(name, date):
pass
@flow
def my_flow():
# creates a run with a name like "hello-marvin-on-Thursday"
my_task(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))
if __name__ == "__main__":
my_flow()
此外,此设置还接受返回字符串的函数,用于指定任务运行的名称。
import datetime
from prefect import flow, task
def generate_task_name():
date = datetime.datetime.now(datetime.timezone.utc)
return f"{date:%A}-is-a-lovely-day"
@task(name="My Example Task",
description="An example task for the docs.",
task_run_name=generate_task_name)
def my_task(name):
pass
@flow
def my_flow():
# creates a run with a name like "Thursday-is-a-lovely-day"
my_task(name="marvin")
if __name__ == "__main__":
my_flow()
如果您需要访问有关任务的信息,请使用prefect.runtime
模块。例如:
from prefect import flow
from prefect.runtime import flow_run, task_run
def generate_task_name():
flow_name = flow_run.flow_name
task_name = task_run.task_name
parameters = task_run.parameters
name = parameters["name"]
limit = parameters["limit"]
return f"{flow_name}-{task_name}-with-{name}-and-{limit}"
@task(name="my-example-task",
description="An example task for a tutorial.",
task_run_name=generate_task_name)
def my_task(name: str, limit: int = 100):
pass
@flow
def my_flow(name: str):
# creates a run with a name like "my-flow-my-example-task-with-marvin-and-100"
my_task(name="marvin")
标签#
标签是可选的字符串标记,它们允许您除了通过名称或流程之外识别和分组任务。标签有助于:
在用户界面中以及通过Prefect REST API根据标签过滤任务运行。
根据标签设置任务运行的并发限制。
您可以将标签作为关键字参数在任务装饰器上指定。
@task(name="hello-task", tags=["test"])
def my_task():
print("Hello, I'm a task")
或者,当任务被调用时指定标签,而不是在其定义中通过tags
上下文管理器。
from prefect import flow, task
from prefect import tags
@task
def my_task():
print("Hello, I'm a task")
@flow
def my_flow():
with tags("test"):
my_task()
if __name__ == "__main__":
my_flow()
超时#
任务超时机制旨在防止意外的长时间运行任务。当一个任务的执行时间超过了设定的超时时长,系统会抛出超时异常并将该任务标记为失败。在用户界面上,这样的任务会被清晰地标注为TimedOut
。从流程的角度来看,超时的任务与其他失败的任务一样被处理。
通过timeout_seconds
关键字参数来指定超时的持续时间。
from prefect import task
import time
@task(timeout_seconds=1, log_prints=True)
def show_timeouts():
print("I will execute")
time.sleep(5)
print("I will not execute")
重试机制#
Prefect能够在任务运行失败时自动重新尝试。如果一个任务的Python函数抛出异常,则该任务运行被认为失败。
要启用重试功能,请在任务中传递retries
和retry_delay_seconds
参数。如果任务运行失败,Prefect将最多重试retries
次,每次尝试之间等待retry_delay_seconds
秒。如果在最后一次重试中任务仍然失败,Prefect会将该任务标记为失败。
当任务被重试时,不会创建一个新的任务运行。相反,会在原始任务运行的状态历史中添加一个新的状态。
重试机制通常在使用外部系统的情境下非常有用,例如发起API请求。下面的例子使用httpx
库来进行HTTP请求。
import httpx
from prefect import flow, task
@task(retries=2, retry_delay_seconds=5)
def get_data_task(
url: str = "https://api.brittle-service.com/endpoint"
) -> dict:
response = httpx.get(url)
# If the response status code is anything but a 2xx, httpx will raise
# an exception. This task doesn't handle the exception, so Prefect will
# catch the exception and will consider the task run failed.
response.raise_for_status()
return response.json()
@flow
def get_data_flow():
get_data_task()
if __name__ == "__main__":
get_data_flow()
在这个任务中,如果对脆弱API的HTTP请求接收到任何非2xx(如200、201等)的状态码,Prefect将会最多重试两次该任务,每次重试之间等待五秒钟。
自定义重试行为#
retry_delay_seconds
选项接受一个整数列表以实现自定义的重试行为。下面的任务将在下次尝试开始前分别等待逐渐增加的间隔时间,即1秒、10秒和100秒:
from prefect import task
@task(retries=3, retry_delay_seconds=[1, 10, 100])
def some_task_with_manual_backoff_retries():
(rest of code follows)
retry_condition_fn
参数接受返回布尔值的可调用对象。
如果该可调用对象返回True
,则任务将被重试。
如果返回False
,则任务不会重试。
该可调用对象接受三个参数:任务本身、任务运行情况以及任务运行的状态。
以下任务将在HTTP状态码不是401或404时重试:
import httpx
from prefect import flow, task
def retry_handler(task, task_run, state) -> bool:
"""Custom retry handler that specifies when to retry a task"""
try:
# Attempt to get the result of the task
state.result()
except httpx.HTTPStatusError as exc:
# Retry on any HTTP status code that is not 401 or 404
do_not_retry_on_these_codes = [401, 404]
return exc.response.status_code not in do_not_retry_on_these_codes
except httpx.ConnectError:
# Do not retry
return False
except:
# For any other exception, retry
return True
@task(retries=1, retry_condition_fn=retry_handler)
def my_api_call_task(url):
response = httpx.get(url)
response.raise_for_status()
return response.json()
@flow
def get_data_flow(url):
my_api_call_task(url=url)
if __name__ == "__main__":
get_data_flow(url="https://httpbin.org/status/503")
另外,你可以传入可调用对象,该对象接受重试次数作为参数并返回列表。
Prefect 包含了 exponential_backoff
工具,它将自动生成与指数退避重试策略相对应的重试延迟列表。
以下流程将在每次重试前等待10秒、20秒,然后是40秒。
from prefect import task
from prefect.tasks import exponential_backoff
@task(retries=3, retry_delay_seconds=exponential_backoff(backoff_factor=10))
def some_task_with_exponential_backoff_retries():
(rest of code follows)
增加“抖动”以避免雷鸣般的群体效应#
您可以在重试延迟时间中添加“抖动”(jitter)。 抖动是在重试周期中加入的随机时间量,这有助于防止“雷鸣般群体效应”的发生,即当许多任务同时重试时,可能会压垮系统。
可以使用retry_jitter_factor
选项来为基本延迟增加变化量。
例如,一个具有0.5 retry_jitter_factor
的10秒重试延迟将允许最多15秒的延迟。
较大的retry_jitter_factor
值可以提供更多的防护措施,以对抗“雷鸣般群体效应”,同时保持平均重试延迟时间恒定。
例如,以下任务在其指数退避中添加了抖动,使得重试延迟将在最大延迟时间20秒、40秒和80秒之间变化。
from prefect import task
from prefect.tasks import exponential_backoff
@task(
retries=3,
retry_delay_seconds=exponential_backoff(backoff_factor=10),
retry_jitter_factor=1,
)
def some_task_with_exponential_backoff_retries():
(rest of code follows)
全局配置重试行为#
通过设置来全局设定默认的重试次数和重试延迟。这些全局设置不会覆盖任务装饰器中设定的 retries
或 retry_delay_seconds
。
prefect config set PREFECT_TASK_DEFAULT_RETRIES=2
prefect config set PREFECT_TASK_DEFAULT_RETRY_DELAY_SECONDS = [1, 10, 100]