Manage results#

Results are the bedrock of many Prefect features - most notably transactions and caching - and are foundational to the resilient execution paradigm that Prefect enables. Any return value from a task or a flow is a result. By default these results are not persisted and no reference to them is maintained in the API.

Enabling result persistence allows you to fully benefit from Prefect’s orchestration features.

**Turn on persistence globally by default**

The simplest way to turn on result persistence globally is through the PREFECT_RESULTS_PERSIST_BY_DEFAULT setting:

prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true

See settings for more information on how settings are managed.

Configuring result persistence#

There are four categories of configuration for result persistence:

Default persistence configuration#

Once result persistence is enabled - whether through the PREFECT_RESULTS_PERSIST_BY_DEFAULT setting or through any of the mechanisms described below - Prefect’s default result storage configuration is activated.

If you enable result persistence and don’t specify a filesystem block, your results will be stored locally. By default, results are persisted to ~/.prefect/storage/.

You can configure the location of these results through the PREFECT_LOCAL_STORAGE_PATH setting.

prefect config set PREFECT_LOCAL_STORAGE_PATH='~/.my-results/'

Enabling result persistence#

In addition to the PREFECT_RESULTS_PERSIST_BY_DEFAULT and PREFECT_TASKS_DEFAULT_PERSIST_RESULT settings, result persistence can also be enabled or disabled on both individual flows and individual tasks. Specifying a non-null value for any of the following keywords on the task decorator will enable result persistence for that task:

  • persist_result: a boolean that allows you to explicitly enable or disable result persistence.

  • result_storage: accepts either a string reference to a storage block or a storage block class that specifies where results should be stored.

  • result_storage_key: a string that specifies the filename of the result within the task’s result storage.

  • result_serializer: a string or serializer that configures how the data should be serialized and deserialized.

  • cache_policy: a cache policy specifying the behavior of the task’s cache.

  • cache_key_fn: a function that configures a custom cache policy.

Similarly, setting persist_result=True, result_storage, or result_serializer on a flow will enable persistence for that flow.

**Enabling persistence on a flow enables persistence by default for its tasks**

Enabling result persistence on a flow through any of the above keywords will also enable it for all tasks called within that flow by default.

Any settings explicitly set on a task take precedence over the flow settings.

Additionally, the PREFECT_TASKS_DEFAULT_PERSIST_RESULT environment variable can be used to globally control the default persistence behavior for tasks, overriding the default behavior set by a parent flow or task.

Result storage#

You can configure the system of record for your results through the result_storage keyword argument. This keyword accepts an instantiated filesystem block, or a block slug. Find your blocks’ slugs with prefect block ls. Note that if you want your tasks to share a common cache, your result storage should be accessible by the infrastructure in which those tasks run. Integrations have cloud-specific storage blocks. For example, a common distributed filesystem for result storage is AWS S3.

Additionally, you can control the default persistence behavior for task results using the default_persist_result setting. This setting allows you to specify whether results should be persisted by default for all tasks. You can set this to True to enable persistence by default, or False to disable it. This setting can be overridden at the individual task or flow level.

{/* pmd-metadata: fixture:cleanup_s3_bucket_block */}

from prefect import flow, task
from prefect_aws.s3 import S3Bucket

test_block = S3Bucket(bucket_name='test-bucket')
test_block.save('test-block')

# define three tasks
# with different result persistence configuration

@task
def my_task():
    return 42

unpersisted_task = my_task.with_options(persist_result=False)
other_storage_task = my_task.with_options(result_storage=test_block)


@flow(result_storage='s3-bucket/my-dev-block')
def my_flow():

    # this task will use the flow's result storage
    my_task()

    # this task will not persist results at all
    unpersisted_task()

    # this task will persist results to its own bucket using a different S3 block
    other_storage_task()

Specifying a default filesystem#

Alternatively, you can specify a different filesystem through the PREFECT_DEFAULT_RESULT_STORAGE_BLOCK setting. Specifying a block document slug here will enable result persistence using that filesystem as the default.

For example:

prefect config set PREFECT_DEFAULT_RESULT_STORAGE_BLOCK='s3-bucket/my-prod-block'
Note that any explicit configuration of `result_storage` on either a flow or task will override this default.

Result filenames#

By default, the filename of a task’s result is computed based on the task’s cache policy, which is typically a hash of various pieces of data and metadata. For flows, the filename is a random UUID.

You can configure the filename of the result file within result storage using either:

  • result_storage_key: a templated string that can use any of the fields within prefect.runtime and the task’s individual parameter values. These templated values will be populated at runtime.

  • cache_key_fn: a function that accepts the task run context and its runtime parameters and returns a string. See task caching documentation for more information.

If both `result_storage_key` and `cache_key_fn` are provided, only the `result_storage_key` will be used.

The following example writes three different result files based on the name parameter passed to the task:

from prefect import flow, task


@task(result_storage_key="hello-{parameters[name]}.pickle")
def hello_world(name: str = "world"):
    return f"hello {name}"


@flow
def my_flow():
    hello_world()
    hello_world(name="foo")
    hello_world(name="bar")

If a result exists at a given storage key in the storage location, the task will load it without running. To learn more about caching mechanics in Prefect, see the caching documentation.

Result serialization#

You can configure how results are serialized to storage using result serializers. These can be set using the result_serializer keyword on both tasks and flows. A default value can be set using the PREFECT_RESULTS_DEFAULT_SERIALIZER setting, which defaults to pickle. Current built-in options include "pickle", "json", "compressed/pickle" and "compressed/json".

The result_serializer accepts both a string identifier or an instance of a ResultSerializer class, allowing you to customize serialization behavior.

Advanced: Caching results in memory#

When running workflows, Prefect keeps the results of all tasks and flows in memory so they can be passed downstream. In some cases, it is desirable to override this behavior. For example, if you are returning a large amount of data from a task, it can be costly to keep it in memory for the entire duration of the flow run.

Flows and tasks both include an option to drop the result from memory once the result has been committed with cache_result_in_memory:

from prefect import flow

@flow(cache_result_in_memory=False)
def foo():
    return "pretend this is large data"

@task(cache_result_in_memory=False)
def bar():
    return "pretend this is biiiig data"