Limit concurrent task runs with tags#
Task run concurrency limits help prevent too many tasks from running simultaneously. For example, if many tasks across multiple flows are designed to interact with a database that only allows 10 connections.
Task run concurrency limits use task tags. You can specify an optional concurrency limit as the maximum number of concurrent
task runs in a Running
state for tasks with a given tag.
If a task has multiple tags, it will run only if all tags have available concurrency.
Tags without specified concurrency limits are treated as unlimited. Setting a tag’s concurrency limit to 0 causes immediate abortion of any task runs with that tag, rather than delaying them.
Execution behavior#
Task tag limits are checked whenever a task run attempts to enter a Running
state.
If there are no concurrency slots available for any one of your task’s tags, it delays the transition to a Running
state
and instructs the client to try entering a Running
state again in 30 seconds
(or the value specified by the PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS
setting).
Configure concurrency limits#
While task run concurrency limits are configured through tags (as shown below), flow run concurrency limits are configured through work pools and/or work queues.
You can set concurrency limits on as few or as many tags as you wish. You can set limits through:
Prefect CLI
Prefect API by using
PrefectClient
Python clientPrefect server UI or Prefect Cloud
CLI#
You can create, list, and remove concurrency limits with Prefect CLI concurrency-limit
commands.
prefect concurrency-limit [command] [arguments]
Command |
Description |
---|---|
create |
Create a concurrency limit by specifying a tag and limit. |
delete |
Delete the concurrency limit set on the specified tag. |
inspect |
View details about a concurrency limit set on the specified tag. |
ls |
View all defined concurrency limits. |
For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:
prefect concurrency-limit create small_instance 10
To delete the concurrency limit on the ‘small_instance’ tag:
prefect concurrency-limit delete small_instance
To view details about the concurrency limit on the ‘small_instance’ tag:
prefect concurrency-limit inspect small_instance
Python client#
To update your tag concurrency limits programmatically, use
PrefectClient.orchestration.create_concurrency_limit
.
create_concurrency_limit
takes two arguments:
tag
specifies the task tag on which you’re setting a limit.concurrency_limit
specifies the maximum number of concurrent task runs for that tag.
For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:
from prefect import get_client
async with get_client() as client:
# set a concurrency limit of 10 on the 'small_instance' tag
limit_id = await client.create_concurrency_limit(
tag="small_instance",
concurrency_limit=10
)
To remove all concurrency limits on a tag, use PrefectClient.delete_concurrency_limit_by_tag
, passing the tag:
async with get_client() as client:
# remove a concurrency limit on the 'small_instance' tag
await client.delete_concurrency_limit_by_tag(tag="small_instance")
If you wish to query for the current limit on a tag, use PrefectClient.read_concurrency_limit_by_tag
, passing the tag:
To see all of your limits across all of your tags, use PrefectClient.read_concurrency_limits
.
async with get_client() as client:
# query the concurrency limit on the 'small_instance' tag
limit = await client.read_concurrency_limit_by_tag(tag="small_instance")