prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Allow setting a custom flow run name

Open marvin-robot opened this issue 2 years ago • 1 comments

Opened from the Prefect Public Slack Community

andreas.ntonas: Hi! In Prefect 2.0, when creating a flow_run for a deployment using https://orion-docs.prefect.io/api-ref/prefect/client/#prefect.client.OrionClient.create_flow_run_from_deployment|API's prefect.client.create_flow_run_from_deployment() is it possible to set the name for the flow_run? I know that there is such an option when calling create_flow_run(name="My flow run name") that takes as input a flow_model, what about when creating a run from a deployment though?

anna: In theory, it is possible but we currently don't expose any functionality to do that. Let me open an issue to check if we can expose that

<@ULVA73B9P> open "Allow setting custom flow run name when creating a flow run from a deployment"

Original thread can be found here.

marvin-robot avatar Jul 06 '22 11:07 marvin-robot

Thanks Anna!

As a Prefect 2.0 user, I would like to be able to specify flow run names when flows are called as functions.

For example, this functionality appears to exist in Prefect 1.0 here https://github.com/PrefectHQ/prefect/discussions/3881

scottwalshqs avatar Sep 14 '22 21:09 scottwalshqs

Related question on Discourse: the user would like to set a custom flow run name based on parameter value e.g. based on a backfill start date https://discourse.prefect.io/t/subflows-and-backfilling/1591/3

anna-geller avatar Sep 21 '22 13:09 anna-geller

I managed to change the name of the task based on params by using this decorator instead of prefect.task:

def internaltask(func: Callable = None, **task_kwargs):
    """Internal version of Prefect task that augments the name shown on UI with desk, and date_id."""
    if func is None:
        return partial(internaltask, **task_kwargs)

    argspec = getfullargspec(func)

    @wraps(func)
    def decorated(*args, **kwargs):
        new_name = "-".join(
            map(
                str,
                filter(
                    None,
                    [
                        func.__name__,
                        get_arg("desk", argspec, *args, **kwargs),
                        get_arg("date_id", argspec, *args, **kwargs),
                    ],
                ),
            )
        )
        return task(func, **task_kwargs).with_options(name=new_name)(*args, **kwargs)

    return decorated

However, when I tried to do the same for flows (by replacing task with flow in all the text of the above code), the actual deployment failed with pretty cryptic message:

Traceback (most recent call last):
  File "/Users/me/dev/data-pipeline/deployments/test.py", line 25, in <module>
    Deployment.build_from_flow(
  File "/Users/me/dev/data-pipeline/.venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 212, in wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/Users/me/dev/data-pipeline/.venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 141, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/Users/me/dev/data-pipeline/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/me/dev/data-pipeline/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/nix/store/mzqyncrwr5cs3xij1prmpw2f0bz8zmc4-python3-3.9.13/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/nix/store/mzqyncrwr5cs3xij1prmpw2f0bz8zmc4-python3-3.9.13/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/Users/me/dev/data-pipeline/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/Users/me/dev/data-pipeline/.venv/lib/python3.9/site-packages/prefect/deployments.py", line 555, in build_from_flow
    deployment.flow_name = flow.name
AttributeError: 'function' object has no attribute 'name'

knl avatar Sep 21 '22 14:09 knl

this makes sense, we'll keep you up to date in this issue as we may progress on custom flow run names

anna-geller avatar Sep 21 '22 14:09 anna-geller

The reason I tried such implementation is because this page says Flow has with_options.

we'll keep you up to date in this issue as we may progress on custom flow run names

Thanks, it would be great to have something like this.

knl avatar Sep 21 '22 14:09 knl

can you send the minimal example of what you tried and didn't work? might be useful even if just for documenting use cases

anna-geller avatar Sep 21 '22 14:09 anna-geller

Strong 👍 here.. the UI's "Search by Run Name" really doesn't do much, since I"m looking to search by specific engineering or business logic names that are more in the Deployment. (rather than the auto-generated names)

Can we autogenerate them by the deployment_name + scheduled time?

image

kevin868 avatar Sep 22 '22 19:09 kevin868

We're assessing these different use cases, but while we do, I'll note that if you create a flow run from a deployment using create_flow_run_from_deployment(), if you pass a value for the name argument, we will use that to set a custom flow run name. I see that we should update the docstring on the method to make that clear, so we'll get on that!

@anna-geller I've reached out to the user who originally asked this question in Slack to make sure they know about the parameter.

abrookins avatar Oct 05 '22 21:10 abrookins

@abrookins I added the docs in #7109, since it wasn't addressed in PR #7084 Hope it didn't duplicate work you've started already. Cheers!

kevin868 avatar Oct 08 '22 01:10 kevin868

Now that customizing a run name when triggering via API is documented and better exposed with the run_deployment utility, we're going to consider this issue closed as we were originally tracking it as a 1.0 / 2.0 gap.

However, it's clear there are a number of other use cases for auto-templating flow run names. We'll need to collect those into a new feature enhancement issue for 2.0. A few things we'll want to address:

  • where and how is this template specified?
  • will the scheduler service also use this template when scheduling runs?
  • what are the valid inputs to the template?

cicdw avatar Oct 11 '22 00:10 cicdw

FYI, this is not resolved for my use case (your bullets may address this, but wanted to call it out):

As a Prefect 2.0 user, I would like to be able to specify flow run names when flows are called as functions (no agents, no deployments, no schedules).

For example, this functionality appears to exist in Prefect 1.0 here https://github.com/PrefectHQ/prefect/discussions/3881

scottwalshqs avatar Oct 11 '22 03:10 scottwalshqs

@scottwalshqs correct, but in v1 this was only renaming a flow run after the fact. You can do that in v2 as well by grabbing the flow run ID from the run context and calling this API route from your task https://docs.prefect.io/api-ref/orion/api/flow_runs/?h=flow+run#prefect.orion.api.flow_runs.update_flow_run

afaik, what Chris is proposing is setting flow run names via a Jinja-like template, similar to templatable task run names in v1, as described here https://docs-v1.prefect.io/core/concepts/templating.html#where-can-i-use-templating

anna-geller avatar Oct 11 '22 14:10 anna-geller

I was using the kwarg "flow_run_name" to name flow runs, but I wanted a solution to name flow during execution of flow. For example, some variable changed during execution and I wanted to name the flow with the result. I managed a way to rename flow during flow runtime using the code below:

from prefect import get_run_logger, task, flow
from prefect.server.schemas.actions import FlowRunUpdate
from prefect.server.api.flow_runs import update_flow_run
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database.configurations import AsyncPostgresConfiguration
from prefect.server.database.query_components import AsyncPostgresQueryComponents
from prefect.server.database.orm_models import AsyncPostgresORMConfiguration
from prefect.settings import PREFECT_API_DATABASE_CONNECTION_URL
from prefect.context import get_run_context
import asyncio

def update_flow(params:dict):
    context = get_run_context()
    logger = get_run_logger()
    for i in params:
        logger.info(f'Updating flow.{i} from "{str(getattr(context.flow_run, i))}" to  "{params[i]}"')
    url = PREFECT_API_DATABASE_CONNECTION_URL.value()
    database_config = AsyncPostgresConfiguration(url)
    query_components = AsyncPostgresQueryComponents()
    orm=AsyncPostgresORMConfiguration()
    db_prefect = PrefectDBInterface(database_config=database_config, query_components=query_components, orm=orm )
    return asyncio.run(
                update_flow_run(
                    FlowRunUpdate(**params), context.flow_run.id, db=db_prefect
                )    
           )

@task       
def wait(): 
    from time import sleep
    sleep(1)

@flow(flow_run_name='my_flow_name_before')
def main(): 
    wait()
    params = {'name':'my_flow_name_after'}
    update_flow(params)

if __name__ == "__main__":
    main()

13:04:59.395 | INFO | Flow run 'rich-lorikeet' - View at http://127.0.0.1:4200/flow-runs/flow-run/09df7641-d1fc-44e8-ba9b-c08b820444d2
13:05:01.004 | INFO | Flow run 'my_flow_name_before' - Updating flow.name from "my_flow_name_before" to "my_flow_name_after"

image

canuters avatar Feb 09 '24 16:02 canuters