Manage states#

States are rich objects that contain information about the status of a particular task run or flow run.

You can learn many things about a task or flow by examining its current state or the history of its states. For example, a state could tell you that a task:

  • is scheduled to make a third run attempt in an hour

  • succeeded and what data it produced

  • was scheduled to run, but later cancelled

  • used the cached result of a previous run instead of re-running

  • failed because it timed out

**Only runs have states**: Flows and tasks are templates that describe what a system does; only when we **run** the system does it also take on a state.

State types#

Prefect states have names and types. A state’s name is often, but not always, synonymous with its type. For example, a task run that is running for the first time has a state with the name Running and the type RUNNING. However, if the task retries, that same task run will have the name Retrying and the type RUNNING.

The distinction between types and names is subtle: state types are typically used for backing orchestration logic, whereas state names are more for visual display and bookkeeping.

The full list of states and state types includes:

Name

Type

Terminal?

Description

Scheduled

SCHEDULED

No

The run will begin at a particular time in the future.

Late

SCHEDULED

No

The run’s scheduled start time has passed, but it has not transitioned to PENDING (15 seconds by default).

AwaitingRetry

SCHEDULED

No

The run did not complete successfully because of a code issue and had remaining retry attempts.

Pending

PENDING

No

The run has been submitted to execute, but is waiting on necessary preconditions to be satisfied.

Running

RUNNING

No

The run code is currently executing.

Retrying

RUNNING

No

The run code is currently executing after previously not completing successfully.

Paused

PAUSED

No

The run code has stopped executing until it receives manual approval to proceed.

Cancelling

CANCELLING

No

The infrastructure on which the code was running is being cleaned up.

Cancelled

CANCELLED

Yes

The run did not complete because a user determined that it should not.

Completed

COMPLETED

Yes

The run completed successfully.

Cached

COMPLETED

Yes

The run result was loaded from a previously cached value.

RolledBack

COMPLETED

Yes

The run completed successfully but the transaction rolled back and executed rollback hooks.

Failed

FAILED

Yes

The run did not complete because of a code issue and had no remaining retry attempts.

Crashed

CRASHED

Yes

The run did not complete because of an infrastructure issue.

Final state determination#

The final state of a flow or task run depends on a number of factors; generally speaking there are three categories of terminal states:

  • COMPLETED: a run in any COMPLETED state did not encounter any errors or exceptions and returned successfully

  • FAILED: a run in any FAILED state encountered an error during execution, such as a raised exception

  • CRASHED: a run in any CRASHED state was interrupted by an OS signal such as a KeyboardInterrupt or SIGTERM

Task return values#

A task will be placed into a Completed state if it returns any Python object, with one exception: if a task explicitly returns a Prefect Failed state, the task will be marked Failed.

from prefect import task, flow
from prefect.states import Completed, Failed


@task
def toggle_task(fail: bool):
    if fail:
        return Failed(message="I was instructed to fail.")
    else:
        return Completed(message="I was instructed to succeed.")


@flow
def example():
    # this run will be set to a `Failed` state
    state_one = toggle_task(fail=True)

    # this run will be set to a `Completed` state
    state_two = toggle_task(fail=False)

    # similarly, the flow run will fail because we return a `Failed` state
    return state_one, state_two

You can also access state objects directly within a flow through the return_state flag:

from prefect import flow, task


@task
def add_one(x):
    return x + 1


@flow
def my_flow():
    result = add_one(1)
    assert isinstance(result, int) and result == 2

    state = add_one(1, return_state=True)
    assert state.is_completed() is True
    assert state.result() == 2
Returning a `State` via `return_state=True` is useful when you want to conditionally respond to the terminal states of a task or flow. For example, `if state.is_failed(): ...`.

Flow return values#

import FinalFlowState from ‘/snippets/final-flow-state.mdx’

Execute code on state changes#

State change hooks execute code in response to client side changes in flow or task run states, enabling you to define actions for specific state transitions in a workflow.

State hooks have the following signature:

def my_task_state_hook(task: Task, run: TaskRun, state: State) -> None:
    ...

def my_flow_state_hook(flow: Flow, run: FlowRun, state: State) -> None:
    ...

Both task and flow run hooks can be specified through a keyword argument or through decorator syntax:

from prefect import task, flow

# for type hints only
from prefect import Task
from prefect.client.schemas.objects import TaskRun
from prefect.states import State


def first_task_hook(tsk: Task, run: TaskRun, state: State) -> None:
    if not state.name == 'Cached':
        print('I run anytime this task executes successfully')
    else:
        print('and can condition my behavior on details of this run')


@task(log_prints=True, on_completion=[first_task_hook])
def nice_task(name: str):
    print(f"Hello {name}!")


# alternatively hooks can be specified via decorator
@nice_task.on_completion
def second_hook(tsk: Task, run: TaskRun, state: State) -> None:
    print('another hook')

nice_task(name='Marvin')
To import a `TaskRun` or `FlowRun` for type hinting, you can import from `prefect.client.schemas.objects`.

State change hooks are versatile, allowing you to specify multiple state change hooks for the same state transition, or to use the same state change hook for different transitions:

def my_success_hook(task, task_run, state):
    print("Task run succeeded!")

def my_failure_hook(task, task_run, state):
    print("Task run failed!")

def my_succeed_or_fail_hook(task, task_run, state):
    print("If the task run succeeds or fails, this hook runs.")

@task(
    on_completion=[my_success_hook, my_succeed_or_fail_hook],
    on_failure=[my_failure_hook, my_succeed_or_fail_hook]
)

Available state change hooks#

Type

Flow

Task

Description

on_completion

Executes when a flow or task run enters a Completed state.

on_failure

Executes when a flow or task run enters a Failed state.

on_cancellation

-

Executes when a flow run enters a Cancelling state.

on_crashed

-

Executes when a flow run enters a Crashed state.

on_running

-

Executes when a flow run enters a Running state.

Note that the `on_rollback` hook for tasks is _not_ a proper state change hook but instead is a transaction lifecycle hook. Rollback hooks accept one argument representing the transaction for the task.

Pass kwargs to state change hooks#

You can compose the with_options method to effectively pass arbitrary **kwargs to your hooks:

from functools import partial
from prefect import flow, task

data = {}

def my_hook(task, task_run, state, **kwargs):
    data.update(state=state, **kwargs)

@task
def bad_task():
    raise ValueError("meh")

@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
    bad_task_with_a_hook = bad_task.with_options(
        on_failure=[partial(my_hook, **dict(x=x, y=y))]
    )
    # return a tuple of "bar" and the task run state
    # to avoid raising the task's exception
    return "bar", bad_task_with_a_hook(return_state=True)

_, task_run_state = ok_with_failure_flow()

assert data == {"x": "foo", "y": 42, "state": task_run_state}

Example usage: send a notification when a flow run fails#

State change hooks enable you to customize messages sent when tasks transition between states, such as sending notifications containing sensitive information when tasks enter a Failed state.

Here’s an example of running a client-side hook upon a flow run entering a Failed state:

from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL

def notify_slack(flow, flow_run, state):
    slack_webhook_block = Block.load(
        "slack-webhook/my-slack-webhook"
    )

    slack_webhook_block.notify(
        (
            f"Your job {flow_run.name} entered {state.name} "
            f"with message:\n\n"
            f"See <https://{PREFECT_API_URL.value()}/flow-runs/"
            f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
            f"Tags: {flow_run.tags}\n\n"
            f"Scheduled start: {flow_run.expected_start_time}"
        )
    )

@flow(on_failure=[notify_slack], retries=1)
def failing_flow():
    raise ValueError("oops!")

if __name__ == "__main__":
    failing_flow()

Note that retries are configured in this example. This means the on_failure hook does not run until all retries have completed when the flow run enters a Failed state.