airflow
airflow copied to clipboard
Add task failed dependencies to details page.
Show task failed dependencies when task is in scheduled or none state. None state helps to capture issues when the execution date is in future, depends_on_past fails with past task instance a failure etc.
closes: #38189 related: #38189
Scheduled state :
None state :
I am getting below error in tests. It seems using get_airflow_app().dag_bag.get_dag(ti.dag_id) causes this error but I am not sure why.
def test_should_respond_200(
self, task_instances, update_extras, payload, expected_ti_count, username, session
):
self.create_task_instances(
session,
update_extras=update_extras,
task_instances=task_instances,
)
> response = self.client.post(
"/api/v1/dags/~/dagRuns/~/taskInstances/list",
environ_overrides={"REMOTE_USER": username},
json=payload,
)
tests/api_connexion/endpoints/test_task_instance_endpoint.py:910:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.env/lib/python3.10/site-packages/werkzeug/test.py:1146: in post
return self.open(*args, **kw)
.env/lib/python3.10/site-packages/flask/testing.py:238: in open
response = super().open(
.env/lib/python3.10/site-packages/werkzeug/test.py:1095: in open
response = self.run_wsgi_app(request.environ, buffered=buffered)
.env/lib/python3.10/site-packages/werkzeug/test.py:962: in run_wsgi_app
rv = run_wsgi_app(self.application, environ, buffered=buffered)
.env/lib/python3.10/site-packages/werkzeug/test.py:1243: in run_wsgi_app
app_rv = app(environ, start_response)
.env/lib/python3.10/site-packages/flask/app.py:2552: in __call__
return self.wsgi_app(environ, start_response)
.env/lib/python3.10/site-packages/flask/app.py:2532: in wsgi_app
response = self.handle_exception(e)
.env/lib/python3.10/site-packages/flask/app.py:2529: in wsgi_app
response = self.full_dispatch_request()
.env/lib/python3.10/site-packages/flask/app.py:1825: in full_dispatch_request
rv = self.handle_user_exception(e)
.env/lib/python3.10/site-packages/flask/app.py:1823: in full_dispatch_request
rv = self.dispatch_request()
.env/lib/python3.10/site-packages/flask/app.py:1799: in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
.env/lib/python3.10/site-packages/connexion/decorators/decorator.py:68: in wrapper
response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/uri_parsing.py:149: in wrapper
response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/validation.py:196: in wrapper
response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/response.py:112: in wrapper
response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/parameter.py:120: in wrapper
return function(**kwargs)
airflow/api_connexion/security.py:165: in decorated
return _requires_access(
airflow/api_connexion/security.py:92: in _requires_access
return func(*args, **kwargs)
airflow/utils/session.py:79: in wrapper
return func(*args, session=session, **kwargs)
airflow/api_connexion/endpoints/task_instance_endpoint.py:437: in get_task_instances_batch
return task_instance_collection_schema.dump(
.env/lib/python3.10/site-packages/marshmallow/schema.py:549: in dump
result = self._serialize(processed_obj, many=many)
.env/lib/python3.10/site-packages/marshmallow/schema.py:517: in _serialize
value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
.env/lib/python3.10/site-packages/marshmallow/fields.py:340: in serialize
return self._serialize(value, attr, obj, **kwargs)
.env/lib/python3.10/site-packages/marshmallow/fields.py:774: in _serialize
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
.env/lib/python3.10/site-packages/marshmallow/fields.py:774: in <listcomp>
return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
.env/lib/python3.10/site-packages/marshmallow/fields.py:643: in _serialize
return schema.dump(nested_obj, many=many)
.env/lib/python3.10/site-packages/marshmallow/schema.py:549: in dump
result = self._serialize(processed_obj, many=many)
.env/lib/python3.10/site-packages/marshmallow/schema.py:517: in _serialize
value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
.env/lib/python3.10/site-packages/marshmallow/fields.py:332: in serialize
value = self.get_value(obj, attr, accessor=accessor)
.env/lib/python3.10/site-packages/marshmallow/fields.py:263: in get_value
return accessor_func(obj, check_key, default)
airflow/api_connexion/schemas/task_instance_schema.py:89: in get_attribute
return get_value(obj[0], attr, default)
.env/lib/python3.10/site-packages/marshmallow/utils.py:276: in get_value
return _get_value_for_key(obj, key, default)
.env/lib/python3.10/site-packages/marshmallow/utils.py:290: in _get_value_for_key
return getattr(obj, key, default)
.env/lib/python3.10/site-packages/sqlalchemy/ext/associationproxy.py:193: in __get__
return inst.get(obj)
.env/lib/python3.10/site-packages/sqlalchemy/ext/associationproxy.py:575: in get
target = getattr(obj, self.target_collection)
.env/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:487: in __get__
return self.impl.get(state, dict_)
.env/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:959: in get
value = self._fire_loader_callables(state, key, passive)
.env/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:995: in _fire_loader_callables
return self.callable_(state, passive)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.orm.strategies.LazyLoader object at 0x7f58c37a44a0>, state = <sqlalchemy.orm.state.InstanceState object at 0x7f58c2246680>
passive = symbol('PASSIVE_OFF'), loadopt = None, extra_criteria = ()
def _load_for_state(self, state, passive, loadopt=None, extra_criteria=()):
if not state.key and (
(
not self.parent_property.load_on_pending
and not state._load_pending
)
or not state.session_id
):
return attributes.ATTR_EMPTY
pending = not state.key
primary_key_identity = None
use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)
if (not passive & attributes.SQL_OK and not use_get) or (
not passive & attributes.NON_PERSISTENT_OK and pending
):
return attributes.PASSIVE_NO_RESULT
if (
# we were given lazy="raise"
self._raise_always
# the no_raise history-related flag was not passed
and not passive & attributes.NO_RAISE
and (
# if we are use_get and related_object_ok is disabled,
# which means we are at most looking in the identity map
# for history purposes or otherwise returning
# PASSIVE_NO_RESULT, don't raise. This is also a
# history-related flag
not use_get
or passive & attributes.RELATED_OBJECT_OK
)
):
self._invoke_raise_load(state, passive, "raise")
session = _state_session(state)
if not session:
if passive & attributes.NO_RAISE:
return attributes.PASSIVE_NO_RESULT
> raise orm_exc.DetachedInstanceError(
"Parent instance %s is not bound to a Session; "
"lazy load operation of attribute '%s' cannot proceed"
% (orm_util.state_str(state), self.key)
)
E sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f58c2246530> is not bound to a Session; lazy load operation of attribute 'task_instance_note' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
.env/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py:863: DetachedInstanceError
--------------------------------------------------------------------------- Captured stdout setup ---------------------------------------------------------------------------
========================= AIRFLOW ==========================
From what I understand some of the methods like dagbag.get_dag use create_session to get the session which invalidates the session attached to ti that has to be available as marshmallow serializes. I tried using inspect to get the session and pass it along so that it's reused. But this more feels like a workaround. I was also curious if this can become a separate endpoint like extra_links endpoint where similar logic of using dagbag is also done. Also the consideration that task_failed_dep looks like a costly operation that it needs to be done for taskinstance schema which is frequently accessed in the UI through autorefresh and other parts though the check for task_failed_deps done is only for tasks in None and failed state.
Also the consideration that task_failed_dep looks like a costly operation that it needs to be done for taskinstance schema which is frequently accessed in the UI through autorefresh and other parts though the check for task_failed_deps done is only for tasks in None and failed state.
Also, the task_failed_deps really only matters when the task is in scheduled state. So a separate endpoint is fine with me.
IMO, making this check for None state also helps in catching issues like depends_on_past=True, execution date is in future etc. where task doesn't get to scheduled state. After the separate endpoint I also realized the implementation has more code than expected initially. I tried timing the check and it comes around 2-3 milliseconds with 10 dags so it's not as costly as I thought from getting dagbag from airflow app but just had the session based subtle issues. So I am open to keeping this as additional field to the existing endpoint like first commit or as a separate endpoint or check to be done only when passing a query parameter to existing task instance endpoint like check_dependencies=True.
Thanks
I really like this extension in the view! Looking forward for fixes and removal of merge conflict... then would like to review.
@tirkarthi Could you still rebase this? It would be great to get this merged soon so we can migrate all the /task pages.
@bbovenzi Sorry, I got held up at work. Fixed the code comments to use a spinner like other pages and an alert on API error. Did a basic test with a mapped task which also seems to be working as expected. Screenshots as below.
Please note that I have enabled this check for tasks in None state as well as there are cases like execution_date is greater than end_date that will be useful. In the old page I see this table always being shown so please let me know if this is needed always or task in scheduled state or task in None/scheduled state.
Spinner during API call
API failed :
Table rendering :
@jscheffl Thanks, updated the version number and regenerated the files. I have looked into auto refresh and found a pattern where auto refresh is done based on useAutoRefresh. The API useTaskFailedDependency doesn't have the taskinstance state based on which the refresh should happen. Once the task goes to queued state the showTaskSchedulingDependencies flag should return False which should hide the component in UI. @bbovenzi Any pointers here?
@jscheffl Thanks, updated the version number and regenerated the files. I have looked into auto refresh and found a pattern where auto refresh is done based on
useAutoRefresh. The API useTaskFailedDependency doesn't have the taskinstance state based on which the refresh should happen. Once the task goes to queued state theshowTaskSchedulingDependenciesflag should return False which should hide the component in UI. @bbovenzi Any pointers here?
I think useTaskFailedDependency can just use the normal refresh logic: refetchInterval: isRefreshOn && (autoRefreshInterval || 1) * 1000,. The API hook doesn't need to know when the state changes.
@bbovenzi Thanks, added auto refresh as suggested and rebased with latest main branch.