airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Add tracking of triggering user to Dag runs

Open jscheffl opened this issue 6 months ago • 9 comments

This PR adds a feature to Airflow that trigering user is tracked on the Dag run level. So far if you have usages where users trigger manual you needed to find-out which user it was by looking into audit log.

The user is tracked with unix user name when using CLI or airflowctl, in web UI or REST cases the authenticated user is used. In case a backfill is started via UI the user who started the backfill propagates the runs of Dags.

FYI @clellmann @wolfdn @AutomationDev85

jscheffl avatar Jun 14 '25 18:06 jscheffl

Note: Marking as DRAFT as I assume some pytests fail and need to be adjusted.

jscheffl avatar Jun 14 '25 18:06 jscheffl

nice! Highly requested feature.

potiuk avatar Jun 15 '25 06:06 potiuk

If we already have it in the audit log could we extract it from there in the ui/api instead of adding it to dag run table?

ashb avatar Jun 15 '25 07:06 ashb

If we already have it in the audit log could we extract it from there in the ui/api instead of adding it to dag run table?

Fiddeling this out of the audit log is also the workaround we use today (any maybe multiple others as well). I think this adds un-needed complexity and also we saw cases where audit log is gone after some time but the run is kept. I think we can afford another field in the DB for this. Note that the 512 char limit is only to align to definition of user in FAB, I assume usually the string is way shorter.

jscheffl avatar Jun 15 '25 07:06 jscheffl

If we already have it in the audit log could we extract it from there in the ui/api instead of adding it to dag run table?na

Fiddeling this out of the audit log is also the workaround we use today (any maybe multiple others as well). I think this adds un-needed complexity and also we saw cases where audit log is gone after some time but the run is kept. I think we can afford another field in the DB for this. Note that the 512 char limit is only to align to definition of user in FAB, I assume usually the string is way shorter.

Agree. In this case convenience of use is very important and it's. been asked many, many, many times to not only retrieve it easily but also be able to display it in the UI and act on it - (if triggering_user == 'x' do something) . Yep - we could probably add something to dynamically retrieve it from audit log, but IMHO audit log's purpose should be only to provide audit logs - I can even imagine that somoene might do automated transfer of audit logs to external system (i.e. put a trigger in a log table to automatically send every audit log to "append-only-no-delete" system (and possibly clean up audit log from the DB).

I think we should not really base any logic (except displaying the logs) on the fact that audit logs are present. For me audit log is not really part of the "application schema".

potiuk avatar Jun 15 '25 08:06 potiuk

If we already have it in the audit log could we extract it from there in the ui/api instead of adding it to dag run table?na

Fiddeling this out of the audit log is also the workaround we use today (any maybe multiple others as well). I think this adds un-needed complexity and also we saw cases where audit log is gone after some time but the run is kept. I think we can afford another field in the DB for this. Note that the 512 char limit is only to align to definition of user in FAB, I assume usually the string is way shorter.

Agree. In this case convenience of use is very important and it's. been asked many, many, many times to not only retrieve it easily but also be able to display it in the UI and act on it - (if triggering_user == 'x' do something) . Yep - we could probably add something to dynamically retrieve it from audit log, but IMHO audit log's purpose should be only to provide audit logs - I can even imagine that somoene might do automated transfer of audit logs to external system (i.e. put a trigger in a log table to automatically send every audit log to "append-only-no-delete" system (and possibly clean up audit log from the DB).

I think we should not really base any logic (except displaying the logs) on the fact that audit logs are present. For me audit log is not really part of the "application schema".

I agree, the way this was stored in the audit logs had changed in the past ( from First Name, Last Name to Username) and this created some rework when we upgraded versions.

Having this directly accessible would help users identify their dagruns in a given dag if multiple users run a given dag. It also allows us to if triggering_user == 'x' do somethin as Jarek mentioned. Much anticipated feature! Many Thanks @jscheffl !!!!!

dheerajturaga avatar Jun 15 '25 22:06 dheerajturaga

I have a general qn regarding the way we are handling things here.

The getuser function uses getpass.getuser() to get the username of the current user and if there's a failure in running this for whatever reason, it raises an AirflowConfigException and we do not catch that anywhere and the dag run will hang / fail. This comment applies to all usages, mainly in API.

So we need to make a decision:

1. Is the `triggering_user` so critical that we fail a dag run / hang it if the user cannot be retrieved? If the `getuser` has failed, its likely an environment issue and less to do with the airflow deployment. So do we continue blocking dag runs until that issue is fixed?

2. Or do we modify `getuser` to return a None / default user in such cases and not block the flow?

Hi, I do not think the triggering user is super critical. But to say the getpass.getuser() is only called and used for the CLI case when called via airflowctl or airflow CLI. In this case the evaluation is backing from the system Unix/Linux user. Have never seen this failing. There should be always a valid Linux user. Or is it not working on Mac? Do you know any environment where this is problematic? Then I am okay to catch and Exception and leave it None in these cases. Note that it also sources frmo a central utility that is used in other places as well. So if there is a problem then we might have more side effects in the past.

jscheffl avatar Jun 16 '25 21:06 jscheffl

@jscheffl valid points, thanks.

In a happy scenario it should never fail yes, and no, i didnt see it failing on mac either.

I was just being careful since we are touching the API layers here and would not want airflow running on a badly configured setup to break things.

For example, i ran a python:3.11 container with a random uid: 99999 and that's not defined and could lead to a failure like so:

(airflow) ➜  airflow git:(fix_helm_chart_release_guide) ✗ docker run --rm -it --user 99999:99999 python:3.11 bash
I have no name!@cbe5b8d5aeb3:/$ python3 -c "import getpass; print(getpass.getuser())"
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/lib/python3.11/getpass.py", line 169, in getuser
    return pwd.getpwuid(os.getuid())[0]
           ^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: 'getpwuid(): uid not found: 99999'

Point being -- adding a try / except would do no harm, but I'll let you take a call on that

amoghrajesh avatar Jun 17 '25 05:06 amoghrajesh

I will say that this is a good reason why we should improve the audit log. I still don't think it is always called when it should be, nor does it always record necessary information that is easy to find. Right now we just dump a lot into the extra column which isn't really possible to search or filter for.

bbovenzi avatar Jun 17 '25 19:06 bbovenzi

Point being -- adding a try / except would do no harm, but I'll let you take a call on that

Added handling for the problem in https://github.com/apache/airflow/pull/51738/commits/4fe66fc8619d55b52472e213e7da6d9e50935d62

jscheffl avatar Jun 21 '25 14:06 jscheffl

My only concern is that we will need to join over this audit log table on the dag_id, dag_run_id, also based on the event something like post_clear_task_instance or cli_trigger and take the latest event because there's no unique constraint and we could have cleared multiple times the DagRun. I think it starts getting messy. (Also we need that for listing dag runs, so we will have this big join almost on every DagRun endpoint and more because that will be part of the base serialization of a DagRun). Or maybe you have something else in mind.

pierrejeambrun avatar Jun 25 '25 14:06 pierrejeambrun

Yes, auditing Is messy. Both our implementation of how we currently do it in Airflow, and audit logging in general.

I don't totally hate this solution, but it is only patching around the problem, and when we want more we should do it properly and that likely means re-designing the audit log table for better usability and searchability.

ashb avatar Jun 25 '25 16:06 ashb

I am with Ash on this one. I would vote in favor of actually improving our Audit Log.

I am happy to not have the "triggered_by" on the dag runs list page. But when you're on a dag run's page, then we can look it up. Looking up audit log by "event" is not hard. We used to even have the config of included and excluded events to show in the audit log by default.

bbovenzi avatar Jun 25 '25 19:06 bbovenzi

@bbovenzi , @ashb what about the use case where we want the task to if triggering_user == someone; do something . Being able to look at the event log on the dag page is fine but we would want to perform some action based on who triggered the dag in the task.

In Airflow 2 we had a task that queries the db audit logs to get this info and return as xcom but in Airflow 3 with task isolation. Im not sure how to do this

dheerajturaga avatar Jun 25 '25 20:06 dheerajturaga

@dheerajturaga Where exactly would you do such an action?

Also: exposing that via the DagRun object at runtime doesn't have to be the same way it's stored in the DB.

Thinking about it more, I question if if triggering_user == someone; do something is even a good pattern to encourage or allow in Airflow DAGs (depending on exactly what do something is)

ashb avatar Jun 25 '25 20:06 ashb

I have the use case as well that on the homepage/dashboard a user would also like to see "The Dag Runs I have triggered" which would be a mess if you need to join and filter audit logs.

I understand the "who cleared the run or task" is something more complex and here I agree that crawling the Audit log is reasonable, but just seeing "who has triggered" in the cases (like we have) where a lot of manual runs are made in the UI / system then it would be great for a user to find "his" Dag runs between others.

jscheffl avatar Jun 25 '25 21:06 jscheffl

@ashb, our use case basically has many users triggering a single dag with different parameters. Its hard to identify which dag run was triggered by whom when you have a lot of dag runs being triggered (which is our usecase)

to work around this in Airflow 2, we were doing something like this:

@task(task_id="owner", execution_timeout=timedelta(minutes=10))
def find_owner(dag_run=None) -> str:
    """
    Identify user/owner who triggered a DAG.
    """
    with create_session() as s:
        user = s.query(Log).filter(
            Log.dag_id == dag_run.dag_id,
            Log.event == 'trigger',
            Log.run_id == dag_run.run_id).one().owner
        if user and 'airflow' in user:
            logger.warning(f"System account trigger: {user}")
        return user

having the triggering user readily available in the task allows us many different things. Say for example:

  • put some breadcrums in the logfiles we generate regarding who triggered the task.
  • allows us to run the dag instance as the triggering user ( something similar to run_as_user ) .. etc

I even think it would be beneficial if we had a "list dag runs triggered by me" button right on the dag page.

dheerajturaga avatar Jun 25 '25 21:06 dheerajturaga

@ashb, @bbovenzi the trigger_dag_run event isn't even associated with a run_id and makes it even harder to identify which runs are triggered by me when many users trigger the same dag. This also makes it impossible to fetch the triggering user from api_instance.get_event_logs image

dheerajturaga avatar Jun 26 '25 03:06 dheerajturaga

@ashb, @bbovenzi the trigger_dag_run event isn't even associated with a run_id and makes it even harder to identify which runs are triggered by me when many users trigger the same dag. This also makes it impossible to fetch the triggering user from api_instance.get_event_logs image

This is why I am saying we invest in our Audit Log to make it useful. Let's add the run id.

"The Dag Runs I have triggered"

Let's make sure we can filter an audit log by user and by event. Done.

bbovenzi avatar Jun 26 '25 18:06 bbovenzi

To be perfectly honest, the audit log concept of ours should be completely reworked if we want to continue naming it "audit log" . The way how it is currently designed it has nothing to do with "audit". There is no way to have audit log that can be deleted without leaving a trace. As it is implemented now, the only real audit that you can make out of it is if you subscribe to the events of the table and move the entries to an "append only" storage. Otherwise it's just a "log" - but it's never "audit". I think it might be vere confusing to the users, personally because "audit log" means really something that you are not able to modify once written.

Of course - we have no such storage available readily and we do not want to complicate "open-source" airflow with such an option. So .. If we realy want to "fix" the audit log, add the run_id and keep the user id in it we should:

  • rename it to just log. It's not audit, it's just regular log of things that happened and just another part of our model. Nothing to do with real "audit log"
  • expose an API where users can get "audit" events and do something with it - namely send to "append only" storage

If we do it this way and make "log" regular part of our data model (but not pretend it's an audit and tell our users that they should rather use the events exposed via the API to keep the audit) - then i am fine with having to lookup user in the log.

But mixing "data model" table and "audit log" functionality is just plain wrong almost by definition.

potiuk avatar Jun 26 '25 22:06 potiuk

Basically my "security" hat is raised by the reminder of my hair every time when I see "audit log" with the thing that we have implemented here.

potiuk avatar Jun 26 '25 22:06 potiuk

Definition by Gemini- "tamper-evident", "verifiable" are crucial part of the definition:

An audit log is a chronological record of system activities, capturing events, actions, and changes within a system or application. It serves as a detailed, tamper-evident history used for accountability, security, compliance, and troubleshooting.

Here's a more detailed explanation:

Purpose:

Audit logs are crucial for maintaining a clear and verifiable record of system behavior. This helps organizations track who did what, when, and where within their systems, which is vital for security, compliance, and troubleshooting.

potiuk avatar Jun 26 '25 22:06 potiuk

Reworking the current audit log table to make it useful could be nice indeed. (Maybe rename it too while we are at it)

Real auditing logs records is I think a different subject all by itself, so I won’t comment on that but Jarek idea makes sense if we don’t want the burden on airflow 3 shoulders

pierrejeambrun avatar Jun 27 '25 00:06 pierrejeambrun

I've always been a fan of changing from Audit Log. But then calling it Events or Log was confusing to users. Happy to think about a better one

bbovenzi avatar Jun 27 '25 02:06 bbovenzi

I've always been a fan of changing from Audit Log. But then calling it Events or Log was confusing to users. Happy to think about a better one

Maybe Actions Log ?

potiuk avatar Jun 27 '25 10:06 potiuk

@bbovenzi I would like to see a better Event/Action log aswell. However, I still think having triggering_user in the dag_run model is useful. It opens up a possibility of having "owners" for dag run instances as well. There is a lot of demand to be able to just look at dag runs triggered by me in the <base_url>/dags/<dag> page.

We have cases where we may have more than 200+ manual triggers for a dag in a day and the only other alternative is to create 1 dag per user (dynamically create it per user). which is unnecessary stress on the dag-processer

dheerajturaga avatar Jun 27 '25 20:06 dheerajturaga

A lot of debate about the triggering user ... actually just because adding one field to the schema which consumes a few bytes... many more fields on task instance which also would be candidated to optimize DB. We never had such a long discussion in other PRs adding e.g. "triggered_by" which then is falling into the same category.

I understand the idea of putting all to a "nice" action log, but this would be major rework. That will take much more effort than I have capacity. At least currently. Also this would add DB overhead as in filters for user you'd always need to join...

Looking at the problem from a pragmatic view we can merge it now and plan a rework for audit -> structured action log in the future. Or if there is a majority to dis-like we can block this demanded feature. (As of missing DB access having the same workarounds like @dheerajturaga this actually would be a loss of function same to us if we would like to move to Airflow 3 from 2.10)

Needless to say I am belonging to the pragmatic party.

jscheffl avatar Jun 27 '25 21:06 jscheffl

Looking at the problem from a pragmatic view we can merge it now and plan a rework for audit -> structured action log in the future. Or if there is a majority to dis-like we can block this demanded feature. (As of missing DB access having the same workarounds like @dheerajturaga this actually would be a loss of function same to us if we would like to move to Airflow 3 from 2.10)

100% agree, This is mostly bike-shedding on a feature that adds almost zero overhead and could be just merged and saves a lot of hassle for the users.

potiuk avatar Jun 28 '25 08:06 potiuk

Just for reference, in Airflow 3, retrieving the triggering user from event logs requires the following approach. I've encapsulated the access_token handling within the get_airflow_client_configuration method for clarity.

That said, there may be scenarios where the logical date does not align as expected, which introduces additional complexity—particularly in our unit tests where we need to mock API responses. Given that over 50% of our DAGs rely on this functionality, its absence could significantly delay our adoption of Airflow 3.

I hope this concern is understandable and that accommodating this request is feasible. I truly appreciate your consideration.

def _find_owner_v3(dag_run=None) -> str | None:
    """
    This is only for Airflow3, use the Airflow Client API
    to fetch the event logs
    """
    # Only run for manual runs
    if dag_run.run_type.name.upper() != 'MANUAL':
        logger.error(f"Not manually triggered. run_type: {dag_run.run_type}")
        return

    # Cant co-relate if logical date is missing for dag run
    if not dag_run.logical_date:
        logger.error(f"No logical date available for this run, cant find owner")
        return

    logical_date = dag_run.logical_date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'

    import airflow_client.client
    from airflow_client.client.rest import ApiException
    from custom.api.utils import get_airflow_client_configuration

    configuration = get_airflow_client_configuration()

    # Enter a context with an instance of the API client
    with airflow_client.client.ApiClient(configuration) as api_client:
        # Create an instance of the API class
        api_instance = airflow_client.client.EventLogApi(api_client)
        event = 'trigger_dag_run'
        try:
            logger.info("#################################################")
            logger.info(f"DAG ID: {dag_run.dag_id}")
            logger.info(f"RUN ID: {dag_run.run_id}")
            logger.info(f"Logical Date: {logical_date}")
            logger.info("#################################################")

            # Get Event Logs
            api_response = api_instance.get_event_logs(dag_id=dag_run.dag_id, event=event)

            if not api_response:
                logger.error("No trigger events found!")
                return

            for event in reversed(api_response.event_logs):
                logger.info(event)
                if event.extra:
                    event_info = {}
                    event_info = json.loads(event.extra)
                    if "logical_date" in event_info:
                        if logical_date == event_info["logical_date"]:
                            logger.info(f"Matching Event: {event}")
                            logger.info(f"Dag triggered by: {event.owner}")
                            return event.owner
        except Exception as e:
            raise AirflowException("Exception when calling EventLogApi->get_event_logs: %s\n" % e)

dheerajturaga avatar Jun 28 '25 15:06 dheerajturaga

actually just because adding one field to the schema which consumes a few bytes... many more fields on task instance which also would be candidated to optimize DB. We never had such a long discussion in other PRs adding e.g. "triggered_by" which then is falling into the same category.

That's because it's not about adding a column/it's not about what the code itself but how we do the feature, and it's about what is the most sustainable way to develop Airflow for the long term; is this short term approach right (which we will likely have in place for years), or should we spend more time to build a longer term and more generic approach.

ashb avatar Jul 01 '25 09:07 ashb