Apply concurrency limit to already submitted flows
Describe the current behavior
When running a number of flows from the same deployment and setting a deployment concurrency limit while the fows are running, this does not immediately limits the number of flow runs. Instead it is possible to start new flow runs and only these new flow runs count towards the limit.
Describe the proposed behavior
If a limit is set while flow runs are already running, i would expect that no further flows could be started until the number of running flows falls below the limit.
Example Use
This would be very helpful if a sudden spike in flow runs is observed and needs to be capped. Currently this could only be achieved by setting the limit to 0 and increasing it again once the flow runs drained.
Additional context
No response
thanks for reporting this! i've reproduced this. Flows already in the RUNNING state when a deployment concurrency limit is set don't count towards that limit, allowing more concurrent runs than intended.
Reproduction script
import asyncio
from prefect import flow, states
from prefect.client.orchestration import get_client
@flow
async def long_running_flow():
print("Flow started, running for 30 seconds...")
await asyncio.sleep(30)
print("Flow completed")
async def main():
async with get_client() as client:
# Create deployment without concurrency limit
flow_id = await client.create_flow(long_running_flow)
deployment_id = await client.create_deployment(
flow_id=flow_id,
name="test-deployment-19404",
entrypoint=f"{__file__}:long_running_flow",
enforce_parameter_schema=False,
)
# Create 5 flow runs and transition to RUNNING
run_ids = []
for i in range(5):
run = await client.create_flow_run_from_deployment(deployment_id)
run_ids.append(run.id)
await client.set_flow_run_state(run_id=run.id, state=states.Pending())
await client.set_flow_run_state(run_id=run.id, state=states.Running())
# Set deployment concurrency limit to 2 while flows are running
from prefect.client.schemas.actions import DeploymentUpdate
await client.update_deployment(
deployment_id=deployment_id,
deployment=DeploymentUpdate(concurrency_limit=2),
)
# Result: All 5 flows remain RUNNING, exceeding the limit of 2
for run_id in run_ids:
run = await client.read_flow_run(run_id)
print(f"{run.name}: {run.state_type.value}")
if __name__ == "__main__":
asyncio.run(main())
root cause appears to be in SecureFlowConcurrencySlots (core_policy.py:540-544) which excludes transitions from RUNNING state, so existing running flows never acquire a lease.
we'll need to consider whether the right approach is to fix this behavior or document it as a known limitation