airflow
airflow copied to clipboard
Airflow log cannot be displayed on logs page
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.9.1
What happened?
When trying to check logs in airflow page, the log keeps loading and cannot display.
Logs are configured to push to elasticsearch. Click "View Logs in Elasticsearch", logs can be seen on ES
Click "See More" button, the page will keep loading and never stop. Same log content will be append to the end of page again and again
We are running airflow on AWS EKS nodes, EKS version is 1.28 The docker image is built by ourself
airflow config map: [logging] logging_level = DEBUG base_log_folder = /opt/airflow/logs log_format = [%%(asctime)s] [%%(levelname)s] %%(filename)s:%%(lineno)d - %%(message)s simple_log_format = [%%(asctime)s] [%%(levelname)s] - %%(message)s colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] [%%(log_color)s%%(levelname)s%%(reset)s] %%(blue)s%%(filename)s:%%(reset)s%%(lineno)d - %%(log_color)s%%(message)s%%(reset)s log_formatter_class = log_config.CustomSecretsMasker file_task_handler_new_folder_permissions = 0o777 remote_logging = True
[elasticsearch] host = https://xxx:[email protected]:xxxx write_stdout = True json_format = True offset_field = log.offset frontend = https://xxx.xxx.xxx/kibana/app/kibana#/discover?_a=(columns:!(message),query:(language:kuery,query:'log_id:"{log_id}"'),sort:!(offset,asc))&_g=(refreshInterval:(pause:!t,value:0),time:(from:now-1y,mode:quick,to:now)) end_of_log_mark = end_of_log log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
[elasticsearch_configs] use_ssl = True verify_certs = True ca_certs = /ssl/certs/generated-cacert.pem
For some dags the log does not work, but for some dags the log is good. Comparing the difference, I see the good logs all have an additional line in ES
When I check airflow 2.9.1 source code, this is the line that airflow use to tell where is the end of a task's log. All the dags are using same airflow config running in same airflow pod. The dag can all run successfully, only have log issues.
I also try to use monkey patch to print more log. Monkey patch code is below:
def new_load_modules_from_file(self, filepath, safe_mode):
from airflow.models.dag import DagContext
log.info(f'new_load_modules_from_file filepath: {str(filepath)}')
log.info(f'new_load_modules_from_file safe_mode: {safe_mode}')
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 1')
if not might_contain_dag(filepath, safe_mode):
# Don't want to spam user with skip messages
if not self.has_logged:
self.has_logged = True
self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath)
return []
self.log.debug("Importing %s", filepath)
mod_name = get_unique_dag_module_name(filepath)
log.info(f'new_load_modules_from_file mod_name: {str(mod_name)}')
if mod_name in sys.modules:
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 2')
del sys.modules[mod_name]
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 3')
DagContext.current_autoregister_module_name = mod_name
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 4')
def parse(mod_name, filepath):
log.info('Enter new_load_modules_from_file parse')
log.info(f'new_load_modules_from_file parse mod_name: {str(mod_name)}')
log.info(f'new_load_modules_from_file parse filepath: {str(filepath)}')
try:
loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
spec = importlib.util.spec_from_loader(mod_name, loader)
new_module = importlib.util.module_from_spec(spec)
log.info(f'new_load_modules_from_file parse loader: {str(loader)}')
log.info(f'new_load_modules_from_file parse spec: {str(spec)}')
log.info(f'new_load_modules_from_file parse new_module: {str(new_module)}')
sys.modules[spec.name] = new_module
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 5')
loader.exec_module(new_module)
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 6')
return [new_module]
except Exception as e:
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 7')
DagContext.autoregistered_dags.clear()
self.log.exception("Failed to import: %s", filepath)
if self.dagbag_import_error_tracebacks:
self.import_errors[filepath] = traceback.format_exc(
limit=-self.dagbag_import_error_traceback_depth
)
else:
self.import_errors[filepath] = str(e)
return []
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 8')
dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
log.info(f'new_load_modules_from_file dagbag_import_timeout: {str(dagbag_import_timeout)}')
if not isinstance(dagbag_import_timeout, (int, float)):
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 9')
raise TypeError(
f"Value ({dagbag_import_timeout}) from get_dagbag_import_timeout must be int or float"
)
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 10')
if dagbag_import_timeout <= 0: # no parsing timeout
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 11')
return parse(mod_name, filepath)
timeout_msg = (
f"DagBag import timeout for {filepath} after {dagbag_import_timeout}s.\n"
"Please take a look at these docs to improve your DAG import time:\n"
f"* {get_docs_url('best-practices.html#top-level-python-code')}\n"
f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}"
)
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 12')
with timeout(dagbag_import_timeout, error_message=timeout_msg):
log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 13')
return parse(mod_name, filepath)
airflow.models.dagbag.DagBag._load_modules_from_file = new_load_modules_from_file
Doing further code debug, I see the normal dags have this line in airflow-worker log
The dags with log issues ended here
Which means the dags with log issues somehow ended on this line
Could someone assist here? Really need to know what caused this log issue.
What you think should happen instead?
Logs for all dags should display normally
How to reproduce
Use airflow 2.9.1, try to run dags
Normal dag
import logging
from datetime import datetime, timedelta
from airflow.configuration import conf
from airflow.models import Variable
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
LOGGER = logging.getLogger("sentinel")
DAG_ID = 'sentinel'
airflow_dag_owner = 'xxx'
base_log_folder = conf.get("logging", "BASE_LOG_FOLDER").rstrip("/")
def time_output_method():
"""Print log to tell dag run time."""
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
LOGGER.info('Current time is: %s', now)
DEFAULT_ARGS = {
'owner': airflow_dag_owner,
'start_date': datetime(2023, 10, 13),
"retries": 0,
"priority_weight": 10000
}
with DAG(
dag_id=DAG_ID,
default_args=DEFAULT_ARGS,
schedule_interval=timedelta(seconds=15),
tags=['sentinel'],
catchup=False
) as dag:
time_output = PythonOperator(
task_id='time_output',
python_callable=time_output_method,
dag=dag,
do_xcom_push=True
)
log_cleanup = BashOperator(
task_id='log_cleanup',
bash_command="cleanup_sentinel.sh",
params={
"log_file_directory": str(base_log_folder)
},
dag=dag)
time_output >> log_cleanup
Logging issue dag:
import base64
import json
import logging
import random
import re
import ssl
import traceback
from datetime import datetime, timezone
import redis
import requests
from airflow.models import Variable
from airflow.models.dag import DAG
from airflow.operators.python import (
BranchPythonOperator,
PythonOperator,
ShortCircuitOperator,
get_current_context,
)
from botocore.exceptions import BotoCoreError, ClientError # type: ignore
from common.constants import ( # type: ignore # type: ignore
XXX, # type: ignore
XXX, # type: ignore
SSL_CACERT,
SSL_CERT_FILE,
SSL_KEY_FILE,
XXX,
XXX,
)
from marshmallow import EXCLUDE
from requests import HTTPError
from requests.exceptions import ReadTimeout
from transition.model import ( # type: ignore # type: ignore # type: ignore # type: ignore # type: ignore
AcquireLocksFailure,
BulkUpdateException,
IllegalArgumentException,
LifecycleStatus,
XXX,
RetryException,
TransitionException,
TransitionsCompleteSchema,
TransitionsRequestSchema,
TransitionStatus,
UnknownException,
ValidationFailure,
ValidationRequestSchema,
)
DAG_ID = "transition_execute"
LOCK_KEY = "ipp::ci"
LOGGER = logging.getLogger("transitions")
FAILURE_MESSAGE_TITLE = "xxx"
FAILURE_UPDATE_QUEUE_ADF_MESSAGE_TITLE = "xxx"
COMMON_ERROR_MESSAGE = "xxx"
COMMON_ERROR_REASON = "xxx"
def handle_global_failure(context):
return
def acquire_locks(**kwargs):
return
def validate(**kwargs):
return
def has_failure(**kwargs):
invalidate = []
unlocked = [1,2,3]
has_invalidate = invalidate is not None and len(invalidate) > 0
has_unlocked = unlocked is not None and len(unlocked) > 0
return has_invalidate or has_unlocked
def has_abort_or_error(**kwargs):
failed_requests = []
return failed_requests is not None and len(failed_requests) > 0
def abort_or_error(**kwargs):
return ["bulk_update_aborted"]
def abort(task_instance):
return
def error(task_instance):
return
def complete_status_change(**kwargs):
return
def release_locks(**kwargs):
return
def to_failure_queue(**kwargs):
return
def to_retry_queue(**kwargs):
return
DEFAULT_ARGS = {
"owner": Variable.get("airflow_dag_owner"),
"start_date": datetime(2021, 1, 2),
"retries": 0,
"on_failure_callback": handle_global_failure,
}
with DAG(
dag_id=DAG_ID,
default_args=DEFAULT_ARGS,
schedule_interval=None,
tags=["transitions"],
) as dag:
task_acquire_locks = PythonOperator(
task_id="acquire_locks",
python_callable=acquire_locks,
dag=dag,
do_xcom_push=True,
retries=0,
)
task_validate = PythonOperator(
task_id="validate",
python_callable=validate,
dag=dag,
do_xcom_push=True,
retries=0,
trigger_rule="one_success",
)
task_has_failure = ShortCircuitOperator(
task_id="has_failure",
python_callable=has_failure,
trigger_rule="all_success",
)
task_has_abort_or_error = ShortCircuitOperator(
task_id="has_abort_or_error",
python_callable=has_abort_or_error,
)
task_abort_or_error = BranchPythonOperator(
task_id="abort_or_error",
python_callable=abort_or_error,
)
task_bulk_abort = PythonOperator(
task_id="bulk_update_aborted",
python_callable=abort,
dag=dag,
retries=0,
)
task_bulk_error = PythonOperator(
task_id="bulk_update_error",
python_callable=error,
dag=dag,
retries=0,
)
task_complete_status_change = PythonOperator(
task_id="task_complete_status_change",
python_callable=complete_status_change,
dag=dag,
do_xcom_push=True,
trigger_rule="one_success",
)
task_release_locks = PythonOperator(
task_id="release_locks",
python_callable=release_locks,
dag=dag,
do_xcom_push=False,
trigger_rule="all_done",
)
task_to_failure_queue = PythonOperator(
task_id="to_failure_queue",
python_callable=to_failure_queue,
dag=dag,
do_xcom_push=False,
)
task_to_retry_queue = PythonOperator(
task_id="to_retry_queue",
python_callable=to_retry_queue,
dag=dag,
do_xcom_push=False,
)
# pylint: disable=pointless-statement
task_acquire_locks >> [task_validate, task_has_failure]
task_validate >> [task_complete_status_change, task_has_failure]
task_has_failure >> task_to_failure_queue
task_complete_status_change >> [
task_has_failure, task_release_locks, task_has_abort_or_error]
task_has_abort_or_error >> task_abort_or_error >> [
task_bulk_abort, task_bulk_error, task_to_retry_queue]
Try to run these 2 dags on airflow2.9.1
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
No response
Deployment
Amazon (AWS) MWAA
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Can you upgrade to Airflow 2.9.2 and see if you have the same problem ? Look at the changelog - there were a related issue solved there. Not sure if the same but the easiest way to check it is if you actually upgrade (which you should do anyway).
Can you upgrade to Airflow 2.9.2 and see if you have the same problem ? Look at the changelog - there were a related issue solved there. Not sure if the same but the easiest way to check it is if you actually upgrade (which you should do anyway).
Thank you for your reply. I tried airflow 2.9.2 but still not working
Update: I have figured out how to resolve this, but the root cause is still unknown You may noticed I have this code in dag:
from common.constants import ( # type: ignore # type: ignore
XXX, # type: ignore
XXX, # type: ignore
SSL_CACERT,
SSL_CERT_FILE,
SSL_KEY_FILE,
XXX,
XXX,
)
This is causing the log issue. If I remove from common.constants import part or just use this instead:
XXX = "123"
XXX = "456"
SSL_CACERT = "somecacert"
SSL_CERT_FILE = "somecertfile"
SSL_KEY_FILE = "somekeyfile"
XXX = "789"
XXX = "000"
The log is good. Looks like using a constants file is causing airflow trouble when initiating a dag. I don't understand, constants file is a common way to use in python, how can it bring so much trouble?
Update:
Moving from common.constants import XXX from top-level code into task will also solve this issue.
Per airflow best practice, importing expensive libs in top-level code will cause dag initialization issue.
The constants file we use has only 43 lines, file size is 3kb.
Is this expensive?
Do you have imports in the common.constants?
Do you have imports in the
common.constants?
No, its just a constant file, look like this:
XXX = "123"
XXX = "456"
SSL_CACERT = "somecacert"
SSL_CERT_FILE = "somecertfile"
SSL_KEY_FILE = "somekeyfile"
XXX = "789"
XXX = "000"
Can you look and 'modules management' (search for it in our docs) and follow the best practices there ?
The - most likely - problem (and explained in the best practices) is that you are using a 'common' name as top package import. This is a Python 'property' that when importing a module, Python will import the first found module on the PYTHONPATH and since there are various places it can come from, likely reason is that you have 'common' somewhere there that is different than what you want to import.
Simply when importing something don't start the import with any name that is likely to be used somewhere else.
Our recommendation is to put all packages of yours in uniquely named package (say your organisation name - say 'myorg'. And always import your code as 'feom myorg.something' - this effectively namespaces your import and avoids any kinds of conflicts with similar names.
Can you look and 'modules management' (search for it in our docs) and follow the best practices there ?
The - most likely - problem (and explained in the best practices) is that you are using a 'common' name as top package import. This is a Python 'property' that when importing a module, Python will import the first found module on the PYTHONPATH and since there are various places it can come from, likely reason is that you have 'common' somewhere there that is different than what you want to import.
Simply when importing something don't start the import with any name that is likely to be used somewhere else.
Our recommendation is to put all packages of yours in uniquely named package (say your organisation name - say 'myorg'. And always import your code as 'feom myorg.something' - this effectively namespaces your import and avoids any kinds of conflicts with similar names.
I will definitely do that, thank you!