prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Prevent running late runs after some threshold

Open joelluijmes opened this issue 1 year ago • 18 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar request and didn't find it.
  • [X] I searched the Prefect documentation for this feature.

Prefect Version

2.x

Describe the current behavior

It would be great if we can configure flows / prefect not to start executing late flow runs (after some threshold). For some reason, I have a flow run which didn't start initially (which I didn't notice). After restarting the agent, this flow run is still picked up, and now it runs a flow which is >25 days late.

image

Describe the proposed behavior

Configure maximum threshold for late runs. After which the flow should transition into a Failed / Crashed state instead of running the flow.

Example Use

New parameter on the Flow would make sense @flow(late_run_theshold_seconds=3600), similar to timeout_seconds.

Additional context

Happy to contribute, but would be best to get some directions from the engineering team to ensure it fits in the design.

joelluijmes avatar Apr 04 '23 10:04 joelluijmes

Hey @joelluijmes, thanks for filing this issue. Are you on Cloud? This is a very common use case for automations - setting up a "zombie killer" automation to cancel flow runs that have been in a late state for some amount of time.

WillRaphaelson avatar Apr 04 '23 14:04 WillRaphaelson

Thank you for responding! No, I am not using Cloud. Ideally, this feature would be integrated into the core functionality 😬 However, building the "zombie killer" flow myself seems like good workaround if this won't be implemented.

joelluijmes avatar Apr 04 '23 15:04 joelluijmes

This could also be trivially implemented as a setting for agents — it can just place the run in a new state instead of running it.

zanieb avatar Apr 04 '23 18:04 zanieb

Yeah I like that approach for a global setting on the agent. Would cancelled be the right state here?

WillRaphaelson avatar Apr 04 '23 18:04 WillRaphaelson

@WillRaphaelson Yeah I think CANCELLED sounds right to me!

zanieb avatar Apr 10 '23 19:04 zanieb

I agree that configuring the agent is a good idea. I'm willing to work on it sometime this week. Should I claim the ticket or simply submit a pull request when I have something ready? I just want to ensure that the work is not duplicate.

joelluijmes avatar Apr 11 '23 07:04 joelluijmes

@joelluijmes it's good to claim it to avoid duplicated work.

I'd add the setting PREFECT_LATE_RUNS_CANCEL_AFTER_SECONDS

zanieb avatar Apr 11 '23 14:04 zanieb

@madkinsz created a POC here: https://github.com/PrefectHQ/prefect/compare/main...joelluijmes:prefect:feat/prevent-rate-runs. If you think this is the right direction, I can create proper unit tests / documentation. I'm aware that right now it hardcodes to 5 minutes, but I guess we should keep the default value optional as not to break existing behavior.

--

While digging in, I noted there is a late_run monitoring service, which marks the flow runs as late. I was wondering if it wouldn't make more sense to mark the flows here as cancelled (or another background service), instead checking it at the agent. An advantage would be that the flow run gets cancelled way earlier, even in case when the agent is not running for some reason.

joelluijmes avatar Apr 25 '23 13:04 joelluijmes

Thanks for putting together a POC!

This is a little tricky. We can add server-side settings for this that could then be enforced by the late runs service and could be enforced by the orchestration rules when the agent proposes a pending state. This would be the "proper" way to do it as the orchestrator would be making the decision instead of the agent. However, this means that you'd need access to the server to configure the settings which for our Cloud users is not an option. We'd need to add a user-facing setting for this in Cloud which would either be for the workspace or some lessor object like the work pool. So perhaps this just belongs as a field on the work pool? cc @desertaxle

I'm second guessing my original suggestion of a simple setting on the agent, but it's still probably the easiest path forward. Perhaps we should still consider doing it just as a flag i.e. --cancel-late 60 and avoid the the setting for now?

zanieb avatar Apr 25 '23 15:04 zanieb

Ah I see. And how about having it as a parameter on the flow itself? Similar to the timeout?

joelluijmes avatar Apr 25 '23 16:04 joelluijmes

@joelluijmes that's a possibility but it's more of a deployment-level feature — it doesn't make any sense for ad-hoc flow calls.

zanieb avatar Apr 25 '23 16:04 zanieb

Ah, I'm comfortable with either option. In my view, the flow timeout feature appears to be quite comparable to the proposed. Furthermore, it provides a touch more versatility. I can envision scenarios in which it is essential to avoid running late flows, but for other flows, it may not necessarily be a problem. For example, if they are scheduled to operate with a specific set of parameters.

Anyways, let me know what you think. I would like to see this feature implemented 😇

joelluijmes avatar Apr 26 '23 07:04 joelluijmes

Hey @joelluijmes, thanks for filing this issue. Are you on Cloud? This is a very common use case for automations - setting up a "zombie killer" automation to cancel flow runs that have been in a late state for some amount of time.

is there some example of this maybe posted somewhere?

also - any status on this?

klayhb avatar Aug 29 '23 12:08 klayhb

This remains on the backlog, I can up the internal priority.

WillRaphaelson avatar Aug 29 '23 17:08 WillRaphaelson

Yes would be great if this was implemented! I can still finish my POC if a decision has been made on how to configure this?

Anyways, @klayhb right now I just wrote my own watchdog flow, which runs every 30 minutes or so

import asyncio

from prefect import State, flow, get_client, task, get_run_logger
from prefect.server.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateType,
    FlowRunFilterStartTime,
    FlowRunFilterExpectedStartTime,
)
from prefect.server.schemas.states import StateType
from datetime import datetime, timedelta
from uuid import UUID


@task
async def find_long_running_flows() -> list[UUID]:
    THRESHOLD_HOURS = 4

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.RUNNING]),
                ),
                start_time=FlowRunFilterStartTime(
                    before_=datetime.utcnow() - timedelta(hours=THRESHOLD_HOURS),
                ),
            )
        )

    get_run_logger().info(
        f"Found {len(flow_runs)} long running flows (> {THRESHOLD_HOURS} hours)\n "
        + "\n ".join([f"{flow_run.name} ({flow_run.id})" for flow_run in flow_runs])
    )

    return [flow_run.id for flow_run in flow_runs]


@task
async def find_stale_flows() -> list[UUID]:
    THRESHOLD_HOURS = 12

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.SCHEDULED]),
                ),
                expected_start_time=FlowRunFilterExpectedStartTime(
                    before_=datetime.utcnow() - timedelta(hours=THRESHOLD_HOURS),
                ),
            )
        )

    get_run_logger().info(
        f"Found {len(flow_runs)} stale flows (> {THRESHOLD_HOURS} hours)\n "
        + "\n ".join([f"{flow_run.name} ({flow_run.id})" for flow_run in flow_runs])
    )

    return [flow_run.id for flow_run in flow_runs]


@task
async def cancel_flow_runs(flow_run_id):
    async with get_client() as client:
        state = State(type=StateType.CANCELLED, message="Cancelled by watchdog")
        await client.set_flow_run_state(flow_run_id, state, force=True)


@flow(name="Watchdog")
async def main():
    stale_flows = await find_stale_flows()
    await cancel_flow_runs.map(stale_flows)

    long_running_flows = await find_long_running_flows()
    await cancel_flow_runs.map(long_running_flows)


if __name__ == "__main__":
    asyncio.run(main())

joelluijmes avatar Aug 30 '23 11:08 joelluijmes

Hi @klayhb! Thanks for your question. Here's an example/demo of using automations to handle runs that get stuck in running in Cloud. If you're in OSS the watchdog flow example should be helpful.

zhen0 avatar Oct 18 '23 15:10 zhen0

We'll keep this on the backlog but I would recommend using automations if you can!

zhen0 avatar Oct 18 '23 15:10 zhen0

I've implemented some enhancements to the Watchdog flow proposed by @joelluijmes. I included parameterized thresholds for both stale and long-running flows in case it's run manually from the UI. Additionally, I've introduced a verification mechanism to prevent the cancellation of the current flow.

I scheduled the flow to run every 30 minutes. I added with that schedule a verification at the beginning to prevent multiple executions in case the Watchdog runs stack up.

Also, I hope the idea of using the --cancel-late 60 flag is developed. That'd be a great help for the OSS users.

import asyncio
from datetime import datetime, timedelta, timezone
from uuid import UUID

from prefect import State, runtime, flow, get_client, task, get_run_logger
from prefect.server.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateType,
    FlowRunFilterStartTime,
    FlowRunFilterExpectedStartTime,
)
from prefect.server.schemas.states import StateType


@task
async def find_long_running_flows(threshold_hours) -> list[UUID]:

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.RUNNING]),
                ),
                start_time=FlowRunFilterStartTime(
                    before_=datetime.now(timezone.utc) -
                    timedelta(hours=threshold_hours),
                ),
            )
        )

    logger = get_run_logger()

    for flow_run in flow_runs:
        if runtime.flow_run.id == str(flow_run.id):
            flow_runs.remove(flow_run)
            logger.info(
                "The ID is that of the current Watchdog. It will not be canceled. ID: %s", str(flow_run.flow_id))

    logger.info(
        f"Found {len(flow_runs)} long-running flows (> {threshold_hours} hours)\n "
        + "\n ".join([f"{flow_run.name} ({flow_run.id})" for flow_run in flow_runs])
    )

    return [flow_run.id for flow_run in flow_runs]


@task
async def find_stale_flows(threshold_hours) -> list[UUID]:

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.SCHEDULED]),
                ),
                expected_start_time=FlowRunFilterExpectedStartTime(
                    before_=datetime.now(timezone.utc) -
                    timedelta(hours=threshold_hours),
                ),
            )
        )

    logger = get_run_logger()

    for flow_run in flow_runs:
        if runtime.flow_run.id == str(flow_run.id):
            flow_runs.remove(flow_run)
            logger.info(
                "The ID is that of the current Watchdog. It will not be canceled. ID: %s", str(flow_run.flow_id))

    logger.info(
        f"Found {len(flow_runs)} flows with high delay (> {threshold_hours} hours)\n "
        + "\n ".join([f"{flow_run.name} ({flow_run.id})" for flow_run in flow_runs])
    )

    return [flow_run.id for flow_run in flow_runs]


@task
async def cancel_flow_runs(flow_run_id):
    logger = get_run_logger()
    async with get_client() as client:
        logger.info("Canceling flow with ID: %s", flow_run_id)
        state = State(type=StateType.CANCELLED,
                      message="Cancelled by watchdog")
        await client.set_flow_run_state(flow_run_id, state, force=True)


@flow(name="Watchdog")
async def watchdog(stale_threshold_hours: float = 12, long_running_threshold_hours: float = 4):
    # Get the time difference between when it was scheduled and started
    flow_timezone = runtime.flow_run.scheduled_start_time.tzinfo
    time_difference = datetime.now(
        flow_timezone) - runtime.flow_run.scheduled_start_time

    # If it started more than 30 minutes after it was scheduled, it should not execute
    # because another watchdog run will take care of it
    if time_difference < timedelta(minutes=30):
        stale_flows = await find_stale_flows(stale_threshold_hours)
        await cancel_flow_runs.map(stale_flows)

        long_running_flows = await find_long_running_flows(long_running_threshold_hours)
        await cancel_flow_runs.map(long_running_flows)
    else:
        await cancel_flow_runs(runtime.flow_run.id)


if __name__ == "__main__":
    asyncio.run(watchdog())

lucasdepetrisd avatar Jan 16 '24 01:01 lucasdepetrisd