airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Add task failed dependencies to details page.

Open tirkarthi opened this issue 1 year ago • 9 comments

Show task failed dependencies when task is in scheduled or none state. None state helps to capture issues when the execution date is in future, depends_on_past fails with past task instance a failure etc.

closes: #38189 related: #38189

Scheduled state :

image

None state :

image

tirkarthi avatar Mar 25 '24 07:03 tirkarthi

I am getting below error in tests. It seems using get_airflow_app().dag_bag.get_dag(ti.dag_id) causes this error but I am not sure why.

    def test_should_respond_200(
        self, task_instances, update_extras, payload, expected_ti_count, username, session
    ):
        self.create_task_instances(
            session,
            update_extras=update_extras,
            task_instances=task_instances,
        )
>       response = self.client.post(
            "/api/v1/dags/~/dagRuns/~/taskInstances/list",
            environ_overrides={"REMOTE_USER": username},
            json=payload,
        )

tests/api_connexion/endpoints/test_task_instance_endpoint.py:910: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.env/lib/python3.10/site-packages/werkzeug/test.py:1146: in post
    return self.open(*args, **kw)
.env/lib/python3.10/site-packages/flask/testing.py:238: in open
    response = super().open(
.env/lib/python3.10/site-packages/werkzeug/test.py:1095: in open
    response = self.run_wsgi_app(request.environ, buffered=buffered)
.env/lib/python3.10/site-packages/werkzeug/test.py:962: in run_wsgi_app
    rv = run_wsgi_app(self.application, environ, buffered=buffered)
.env/lib/python3.10/site-packages/werkzeug/test.py:1243: in run_wsgi_app
    app_rv = app(environ, start_response)
.env/lib/python3.10/site-packages/flask/app.py:2552: in __call__
    return self.wsgi_app(environ, start_response)
.env/lib/python3.10/site-packages/flask/app.py:2532: in wsgi_app
    response = self.handle_exception(e)
.env/lib/python3.10/site-packages/flask/app.py:2529: in wsgi_app
    response = self.full_dispatch_request()
.env/lib/python3.10/site-packages/flask/app.py:1825: in full_dispatch_request
    rv = self.handle_user_exception(e)
.env/lib/python3.10/site-packages/flask/app.py:1823: in full_dispatch_request
    rv = self.dispatch_request()
.env/lib/python3.10/site-packages/flask/app.py:1799: in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
.env/lib/python3.10/site-packages/connexion/decorators/decorator.py:68: in wrapper
    response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/uri_parsing.py:149: in wrapper
    response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/validation.py:196: in wrapper
    response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/response.py:112: in wrapper
    response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/parameter.py:120: in wrapper
    return function(**kwargs)
airflow/api_connexion/security.py:165: in decorated
    return _requires_access(
airflow/api_connexion/security.py:92: in _requires_access
    return func(*args, **kwargs)
airflow/utils/session.py:79: in wrapper
    return func(*args, session=session, **kwargs)
airflow/api_connexion/endpoints/task_instance_endpoint.py:437: in get_task_instances_batch
    return task_instance_collection_schema.dump(
.env/lib/python3.10/site-packages/marshmallow/schema.py:549: in dump
    result = self._serialize(processed_obj, many=many)
.env/lib/python3.10/site-packages/marshmallow/schema.py:517: in _serialize
    value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
.env/lib/python3.10/site-packages/marshmallow/fields.py:340: in serialize
    return self._serialize(value, attr, obj, **kwargs)
.env/lib/python3.10/site-packages/marshmallow/fields.py:774: in _serialize
    return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
.env/lib/python3.10/site-packages/marshmallow/fields.py:774: in <listcomp>
    return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
.env/lib/python3.10/site-packages/marshmallow/fields.py:643: in _serialize
    return schema.dump(nested_obj, many=many)
.env/lib/python3.10/site-packages/marshmallow/schema.py:549: in dump
    result = self._serialize(processed_obj, many=many)
.env/lib/python3.10/site-packages/marshmallow/schema.py:517: in _serialize
    value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
.env/lib/python3.10/site-packages/marshmallow/fields.py:332: in serialize
    value = self.get_value(obj, attr, accessor=accessor)
.env/lib/python3.10/site-packages/marshmallow/fields.py:263: in get_value
    return accessor_func(obj, check_key, default)
airflow/api_connexion/schemas/task_instance_schema.py:89: in get_attribute
    return get_value(obj[0], attr, default)
.env/lib/python3.10/site-packages/marshmallow/utils.py:276: in get_value
    return _get_value_for_key(obj, key, default)
.env/lib/python3.10/site-packages/marshmallow/utils.py:290: in _get_value_for_key
    return getattr(obj, key, default)
.env/lib/python3.10/site-packages/sqlalchemy/ext/associationproxy.py:193: in __get__
    return inst.get(obj)
.env/lib/python3.10/site-packages/sqlalchemy/ext/associationproxy.py:575: in get
    target = getattr(obj, self.target_collection)
.env/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:487: in __get__
    return self.impl.get(state, dict_)
.env/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:959: in get
    value = self._fire_loader_callables(state, key, passive)
.env/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:995: in _fire_loader_callables
    return self.callable_(state, passive)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <sqlalchemy.orm.strategies.LazyLoader object at 0x7f58c37a44a0>, state = <sqlalchemy.orm.state.InstanceState object at 0x7f58c2246680>
passive = symbol('PASSIVE_OFF'), loadopt = None, extra_criteria = ()

    def _load_for_state(self, state, passive, loadopt=None, extra_criteria=()):
        if not state.key and (
            (
                not self.parent_property.load_on_pending
                and not state._load_pending
            )
            or not state.session_id
        ):
            return attributes.ATTR_EMPTY
    
        pending = not state.key
        primary_key_identity = None
    
        use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)
    
        if (not passive & attributes.SQL_OK and not use_get) or (
            not passive & attributes.NON_PERSISTENT_OK and pending
        ):
            return attributes.PASSIVE_NO_RESULT
    
        if (
            # we were given lazy="raise"
            self._raise_always
            # the no_raise history-related flag was not passed
            and not passive & attributes.NO_RAISE
            and (
                # if we are use_get and related_object_ok is disabled,
                # which means we are at most looking in the identity map
                # for history purposes or otherwise returning
                # PASSIVE_NO_RESULT, don't raise.  This is also a
                # history-related flag
                not use_get
                or passive & attributes.RELATED_OBJECT_OK
            )
        ):
    
            self._invoke_raise_load(state, passive, "raise")
    
        session = _state_session(state)
        if not session:
            if passive & attributes.NO_RAISE:
                return attributes.PASSIVE_NO_RESULT
    
>           raise orm_exc.DetachedInstanceError(
                "Parent instance %s is not bound to a Session; "
                "lazy load operation of attribute '%s' cannot proceed"
                % (orm_util.state_str(state), self.key)
            )
E           sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f58c2246530> is not bound to a Session; lazy load operation of attribute 'task_instance_note' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)

.env/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py:863: DetachedInstanceError
--------------------------------------------------------------------------- Captured stdout setup ---------------------------------------------------------------------------
========================= AIRFLOW ==========================

tirkarthi avatar Mar 25 '24 07:03 tirkarthi

From what I understand some of the methods like dagbag.get_dag use create_session to get the session which invalidates the session attached to ti that has to be available as marshmallow serializes. I tried using inspect to get the session and pass it along so that it's reused. But this more feels like a workaround. I was also curious if this can become a separate endpoint like extra_links endpoint where similar logic of using dagbag is also done. Also the consideration that task_failed_dep looks like a costly operation that it needs to be done for taskinstance schema which is frequently accessed in the UI through autorefresh and other parts though the check for task_failed_deps done is only for tasks in None and failed state.

tirkarthi avatar Mar 25 '24 09:03 tirkarthi

Also the consideration that task_failed_dep looks like a costly operation that it needs to be done for taskinstance schema which is frequently accessed in the UI through autorefresh and other parts though the check for task_failed_deps done is only for tasks in None and failed state.

Also, the task_failed_deps really only matters when the task is in scheduled state. So a separate endpoint is fine with me.

bbovenzi avatar Mar 25 '24 15:03 bbovenzi

IMO, making this check for None state also helps in catching issues like depends_on_past=True, execution date is in future etc. where task doesn't get to scheduled state. After the separate endpoint I also realized the implementation has more code than expected initially. I tried timing the check and it comes around 2-3 milliseconds with 10 dags so it's not as costly as I thought from getting dagbag from airflow app but just had the session based subtle issues. So I am open to keeping this as additional field to the existing endpoint like first commit or as a separate endpoint or check to be done only when passing a query parameter to existing task instance endpoint like check_dependencies=True.

image

Thanks

tirkarthi avatar Mar 26 '24 15:03 tirkarthi

I really like this extension in the view! Looking forward for fixes and removal of merge conflict... then would like to review.

jscheffl avatar Apr 19 '24 21:04 jscheffl

@tirkarthi Could you still rebase this? It would be great to get this merged soon so we can migrate all the /task pages.

bbovenzi avatar May 02 '24 16:05 bbovenzi

@bbovenzi Sorry, I got held up at work. Fixed the code comments to use a spinner like other pages and an alert on API error. Did a basic test with a mapped task which also seems to be working as expected. Screenshots as below.

Please note that I have enabled this check for tasks in None state as well as there are cases like execution_date is greater than end_date that will be useful. In the old page I see this table always being shown so please let me know if this is needed always or task in scheduled state or task in None/scheduled state.

Spinner during API call

image

API failed :

image

Table rendering :

image

tirkarthi avatar May 03 '24 06:05 tirkarthi

@jscheffl Thanks, updated the version number and regenerated the files. I have looked into auto refresh and found a pattern where auto refresh is done based on useAutoRefresh. The API useTaskFailedDependency doesn't have the taskinstance state based on which the refresh should happen. Once the task goes to queued state the showTaskSchedulingDependencies flag should return False which should hide the component in UI. @bbovenzi Any pointers here?

tirkarthi avatar May 09 '24 13:05 tirkarthi

@jscheffl Thanks, updated the version number and regenerated the files. I have looked into auto refresh and found a pattern where auto refresh is done based on useAutoRefresh. The API useTaskFailedDependency doesn't have the taskinstance state based on which the refresh should happen. Once the task goes to queued state the showTaskSchedulingDependencies flag should return False which should hide the component in UI. @bbovenzi Any pointers here?

I think useTaskFailedDependency can just use the normal refresh logic: refetchInterval: isRefreshOn && (autoRefreshInterval || 1) * 1000,. The API hook doesn't need to know when the state changes.

bbovenzi avatar May 09 '24 16:05 bbovenzi

@bbovenzi Thanks, added auto refresh as suggested and rebased with latest main branch.

tirkarthi avatar May 16 '24 12:05 tirkarthi