Manage states#
States are rich objects that contain information about the status of a particular task run or flow run.
You can learn many things about a task or flow by examining its current state or the history of its states. For example, a state could tell you that a task:
is scheduled to make a third run attempt in an hour
succeeded and what data it produced
was scheduled to run, but later cancelled
used the cached result of a previous run instead of re-running
failed because it timed out
State types#
Prefect states have names and types.
A state’s name is often, but not always, synonymous with its type. For example, a task run
that is running for the first time has a state with the name Running and the type RUNNING
. However, if the task retries,
that same task run will have the name Retrying and the type RUNNING
.
The distinction between types and names is subtle: state types are typically used for backing orchestration logic, whereas state names are more for visual display and bookkeeping.
The full list of states and state types includes:
Name |
Type |
Terminal? |
Description |
---|---|---|---|
|
|
No |
The run will begin at a particular time in the future. |
|
|
No |
The run’s scheduled start time has passed, but it has not transitioned to PENDING (15 seconds by default). |
|
|
No |
The run did not complete successfully because of a code issue and had remaining retry attempts. |
|
|
No |
The run has been submitted to execute, but is waiting on necessary preconditions to be satisfied. |
|
|
No |
The run code is currently executing. |
|
|
No |
The run code is currently executing after previously not completing successfully. |
|
|
No |
The run code has stopped executing until it receives manual approval to proceed. |
|
|
No |
The infrastructure on which the code was running is being cleaned up. |
|
|
Yes |
The run did not complete because a user determined that it should not. |
|
|
Yes |
The run completed successfully. |
|
|
Yes |
The run result was loaded from a previously cached value. |
|
|
Yes |
The run completed successfully but the transaction rolled back and executed rollback hooks. |
|
|
Yes |
The run did not complete because of a code issue and had no remaining retry attempts. |
|
|
Yes |
The run did not complete because of an infrastructure issue. |
Final state determination#
The final state of a flow or task run depends on a number of factors; generally speaking there are three categories of terminal states:
COMPLETED
: a run in anyCOMPLETED
state did not encounter any errors or exceptions and returned successfullyFAILED
: a run in anyFAILED
state encountered an error during execution, such as a raised exceptionCRASHED
: a run in anyCRASHED
state was interrupted by an OS signal such as aKeyboardInterrupt
orSIGTERM
Task return values#
A task will be placed into a Completed
state if it returns any Python object, with one exception:
if a task explicitly returns a Prefect Failed
state, the task will be marked Failed
.
from prefect import task, flow
from prefect.states import Completed, Failed
@task
def toggle_task(fail: bool):
if fail:
return Failed(message="I was instructed to fail.")
else:
return Completed(message="I was instructed to succeed.")
@flow
def example():
# this run will be set to a `Failed` state
state_one = toggle_task(fail=True)
# this run will be set to a `Completed` state
state_two = toggle_task(fail=False)
# similarly, the flow run will fail because we return a `Failed` state
return state_one, state_two
You can also access state objects directly within a flow through the return_state
flag:
from prefect import flow, task
@task
def add_one(x):
return x + 1
@flow
def my_flow():
result = add_one(1)
assert isinstance(result, int) and result == 2
state = add_one(1, return_state=True)
assert state.is_completed() is True
assert state.result() == 2
Flow return values#
import FinalFlowState from ‘/snippets/final-flow-state.mdx’
Execute code on state changes#
State change hooks execute code in response to client side changes in flow or task run states, enabling you to define actions for specific state transitions in a workflow.
State hooks have the following signature:
def my_task_state_hook(task: Task, run: TaskRun, state: State) -> None:
...
def my_flow_state_hook(flow: Flow, run: FlowRun, state: State) -> None:
...
Both task and flow run hooks can be specified through a keyword argument or through decorator syntax:
from prefect import task, flow
# for type hints only
from prefect import Task
from prefect.client.schemas.objects import TaskRun
from prefect.states import State
def first_task_hook(tsk: Task, run: TaskRun, state: State) -> None:
if not state.name == 'Cached':
print('I run anytime this task executes successfully')
else:
print('and can condition my behavior on details of this run')
@task(log_prints=True, on_completion=[first_task_hook])
def nice_task(name: str):
print(f"Hello {name}!")
# alternatively hooks can be specified via decorator
@nice_task.on_completion
def second_hook(tsk: Task, run: TaskRun, state: State) -> None:
print('another hook')
nice_task(name='Marvin')
State change hooks are versatile, allowing you to specify multiple state change hooks for the same state transition, or to use the same state change hook for different transitions:
def my_success_hook(task, task_run, state):
print("Task run succeeded!")
def my_failure_hook(task, task_run, state):
print("Task run failed!")
def my_succeed_or_fail_hook(task, task_run, state):
print("If the task run succeeds or fails, this hook runs.")
@task(
on_completion=[my_success_hook, my_succeed_or_fail_hook],
on_failure=[my_failure_hook, my_succeed_or_fail_hook]
)
Available state change hooks#
Type |
Flow |
Task |
Description |
---|---|---|---|
|
✓ |
✓ |
Executes when a flow or task run enters a |
|
✓ |
✓ |
Executes when a flow or task run enters a |
|
✓ |
- |
Executes when a flow run enters a |
|
✓ |
- |
Executes when a flow run enters a |
|
✓ |
- |
Executes when a flow run enters a |
Pass kwargs
to state change hooks#
You can compose the with_options
method to effectively pass arbitrary **kwargs
to your hooks:
from functools import partial
from prefect import flow, task
data = {}
def my_hook(task, task_run, state, **kwargs):
data.update(state=state, **kwargs)
@task
def bad_task():
raise ValueError("meh")
@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
bad_task_with_a_hook = bad_task.with_options(
on_failure=[partial(my_hook, **dict(x=x, y=y))]
)
# return a tuple of "bar" and the task run state
# to avoid raising the task's exception
return "bar", bad_task_with_a_hook(return_state=True)
_, task_run_state = ok_with_failure_flow()
assert data == {"x": "foo", "y": 42, "state": task_run_state}
Example usage: send a notification when a flow run fails#
State change hooks enable you to customize messages sent when tasks transition between states, such as sending notifications containing sensitive information when tasks enter a Failed
state.
Here’s an example of running a client-side hook upon a flow run entering a Failed
state:
from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL
def notify_slack(flow, flow_run, state):
slack_webhook_block = Block.load(
"slack-webhook/my-slack-webhook"
)
slack_webhook_block.notify(
(
f"Your job {flow_run.name} entered {state.name} "
f"with message:\n\n"
f"See <https://{PREFECT_API_URL.value()}/flow-runs/"
f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Scheduled start: {flow_run.expected_start_time}"
)
)
@flow(on_failure=[notify_slack], retries=1)
def failing_flow():
raise ValueError("oops!")
if __name__ == "__main__":
failing_flow()
Note that retries are configured in this example. This means the on_failure
hook does not run until all retries
have completed when the flow run enters a Failed
state.