Manage results#
Results are the bedrock of many Prefect features - most notably transactions and caching - and are foundational to the resilient execution paradigm that Prefect enables. Any return value from a task or a flow is a result. By default these results are not persisted and no reference to them is maintained in the API.
Enabling result persistence allows you to fully benefit from Prefect’s orchestration features.
The simplest way to turn on result persistence globally is through the PREFECT_RESULTS_PERSIST_BY_DEFAULT
setting:
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
See settings for more information on how settings are managed.
Configuring result persistence#
There are four categories of configuration for result persistence:
whether to persist results at all: this is configured through various keyword arguments, the
PREFECT_RESULTS_PERSIST_BY_DEFAULT
setting, and thePREFECT_TASKS_DEFAULT_PERSIST_RESULT
setting for tasks specifically.what filesystem to persist results to: this is configured through the
result_storage
keyword and thePREFECT_DEFAULT_RESULT_STORAGE_BLOCK
setting.how to serialize and deserialize results: this is configured through the
result_serializer
keyword and thePREFECT_RESULTS_DEFAULT_SERIALIZER
setting.what filename to use: this is configured through one of
result_storage_key
,cache_policy
, orcache_key_fn
.
Default persistence configuration#
Once result persistence is enabled - whether through the PREFECT_RESULTS_PERSIST_BY_DEFAULT
setting or
through any of the mechanisms described below - Prefect’s default
result storage configuration is activated.
If you enable result persistence and don’t specify a filesystem block, your results will be stored locally.
By default, results are persisted to ~/.prefect/storage/
.
You can configure the location of these results through the PREFECT_LOCAL_STORAGE_PATH
setting.
prefect config set PREFECT_LOCAL_STORAGE_PATH='~/.my-results/'
Enabling result persistence#
In addition to the PREFECT_RESULTS_PERSIST_BY_DEFAULT
and PREFECT_TASKS_DEFAULT_PERSIST_RESULT
settings,
result persistence can also be enabled or disabled on both individual flows and individual tasks.
Specifying a non-null value for any of the following keywords on the task decorator will enable result
persistence for that task:
persist_result
: a boolean that allows you to explicitly enable or disable result persistence.result_storage
: accepts either a string reference to a storage block or a storage block class that specifies where results should be stored.result_storage_key
: a string that specifies the filename of the result within the task’s result storage.result_serializer
: a string or serializer that configures how the data should be serialized and deserialized.cache_policy
: a cache policy specifying the behavior of the task’s cache.cache_key_fn
: a function that configures a custom cache policy.
Similarly, setting persist_result=True
, result_storage
, or result_serializer
on a flow will enable
persistence for that flow.
Enabling result persistence on a flow through any of the above keywords will also enable it for all tasks called within that flow by default.
Any settings explicitly set on a task take precedence over the flow settings.
Additionally, the PREFECT_TASKS_DEFAULT_PERSIST_RESULT
environment variable can be used to globally control the default persistence behavior for tasks, overriding the default behavior set by a parent flow or task.
Result storage#
You can configure the system of record for your results through the result_storage
keyword argument.
This keyword accepts an instantiated filesystem block, or a block slug. Find your blocks’ slugs with prefect block ls
.
Note that if you want your tasks to share a common cache, your result storage should be accessible by
the infrastructure in which those tasks run. Integrations have cloud-specific storage blocks.
For example, a common distributed filesystem for result storage is AWS S3.
Additionally, you can control the default persistence behavior for task results using the default_persist_result
setting. This setting allows you to specify whether results should be persisted by default for all tasks. You can set this to True
to enable persistence by default, or False
to disable it. This setting can be overridden at the individual task or flow level.
{/* pmd-metadata: fixture:cleanup_s3_bucket_block */}
from prefect import flow, task
from prefect_aws.s3 import S3Bucket
test_block = S3Bucket(bucket_name='test-bucket')
test_block.save('test-block')
# define three tasks
# with different result persistence configuration
@task
def my_task():
return 42
unpersisted_task = my_task.with_options(persist_result=False)
other_storage_task = my_task.with_options(result_storage=test_block)
@flow(result_storage='s3-bucket/my-dev-block')
def my_flow():
# this task will use the flow's result storage
my_task()
# this task will not persist results at all
unpersisted_task()
# this task will persist results to its own bucket using a different S3 block
other_storage_task()
Specifying a default filesystem#
Alternatively, you can specify a different filesystem through the PREFECT_DEFAULT_RESULT_STORAGE_BLOCK
setting.
Specifying a block document slug here will enable result persistence using that filesystem as the default.
For example:
prefect config set PREFECT_DEFAULT_RESULT_STORAGE_BLOCK='s3-bucket/my-prod-block'
Result filenames#
By default, the filename of a task’s result is computed based on the task’s cache policy, which is typically a hash of various pieces of data and metadata. For flows, the filename is a random UUID.
You can configure the filename of the result file within result storage using either:
result_storage_key
: a templated string that can use any of the fields withinprefect.runtime
and the task’s individual parameter values. These templated values will be populated at runtime.cache_key_fn
: a function that accepts the task run context and its runtime parameters and returns a string. See task caching documentation for more information.
The following example writes three different result files based on the name
parameter passed to the task:
from prefect import flow, task
@task(result_storage_key="hello-{parameters[name]}.pickle")
def hello_world(name: str = "world"):
return f"hello {name}"
@flow
def my_flow():
hello_world()
hello_world(name="foo")
hello_world(name="bar")
If a result exists at a given storage key in the storage location, the task will load it without running. To learn more about caching mechanics in Prefect, see the caching documentation.
Result serialization#
You can configure how results are serialized to storage using result serializers.
These can be set using the result_serializer
keyword on both tasks and flows.
A default value can be set using the PREFECT_RESULTS_DEFAULT_SERIALIZER
setting, which defaults to pickle
.
Current built-in options include "pickle"
, "json"
, "compressed/pickle"
and "compressed/json"
.
The result_serializer
accepts both a string identifier or an instance of a ResultSerializer
class, allowing
you to customize serialization behavior.
Advanced: Caching results in memory#
When running workflows, Prefect keeps the results of all tasks and flows in memory so they can be passed downstream. In some cases, it is desirable to override this behavior. For example, if you are returning a large amount of data from a task, it can be costly to keep it in memory for the entire duration of the flow run.
Flows and tasks both include an option to drop the result from memory once the
result has been committed with cache_result_in_memory
:
from prefect import flow
@flow(cache_result_in_memory=False)
def foo():
return "pretend this is large data"
@task(cache_result_in_memory=False)
def bar():
return "pretend this is biiiig data"