prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Apply concurrency limit to already submitted flows

Open j-tr opened this issue 5 months ago • 1 comments

Describe the current behavior

When running a number of flows from the same deployment and setting a deployment concurrency limit while the fows are running, this does not immediately limits the number of flow runs. Instead it is possible to start new flow runs and only these new flow runs count towards the limit.

Describe the proposed behavior

If a limit is set while flow runs are already running, i would expect that no further flows could be started until the number of running flows falls below the limit.

Example Use

This would be very helpful if a sudden spike in flow runs is observed and needs to be capped. Currently this could only be achieved by setting the limit to 0 and increasing it again once the flow runs drained.

Additional context

No response

j-tr avatar Nov 10 '25 17:11 j-tr

thanks for reporting this! i've reproduced this. Flows already in the RUNNING state when a deployment concurrency limit is set don't count towards that limit, allowing more concurrent runs than intended.

Reproduction script
import asyncio
from prefect import flow, states
from prefect.client.orchestration import get_client


@flow
async def long_running_flow():
    print("Flow started, running for 30 seconds...")
    await asyncio.sleep(30)
    print("Flow completed")


async def main():
    async with get_client() as client:
        # Create deployment without concurrency limit
        flow_id = await client.create_flow(long_running_flow)
        deployment_id = await client.create_deployment(
            flow_id=flow_id,
            name="test-deployment-19404",
            entrypoint=f"{__file__}:long_running_flow",
            enforce_parameter_schema=False,
        )

        # Create 5 flow runs and transition to RUNNING
        run_ids = []
        for i in range(5):
            run = await client.create_flow_run_from_deployment(deployment_id)
            run_ids.append(run.id)
            await client.set_flow_run_state(run_id=run.id, state=states.Pending())
            await client.set_flow_run_state(run_id=run.id, state=states.Running())

        # Set deployment concurrency limit to 2 while flows are running
        from prefect.client.schemas.actions import DeploymentUpdate
        await client.update_deployment(
            deployment_id=deployment_id,
            deployment=DeploymentUpdate(concurrency_limit=2),
        )

        # Result: All 5 flows remain RUNNING, exceeding the limit of 2
        for run_id in run_ids:
            run = await client.read_flow_run(run_id)
            print(f"{run.name}: {run.state_type.value}")


if __name__ == "__main__":
    asyncio.run(main())

root cause appears to be in SecureFlowConcurrencySlots (core_policy.py:540-544) which excludes transitions from RUNNING state, so existing running flows never acquire a lease.

we'll need to consider whether the right approach is to fix this behavior or document it as a known limitation

zzstoatzz avatar Nov 10 '25 19:11 zzstoatzz