airflow-maintenance-dags
airflow-maintenance-dags copied to clipboard
Another task is already deleting logs on this worker node.
Trying to using the log_cleaner script.
I have a simple installation where the DAG finish to show like this :
But everytime the DAG run I got the message :
Another task is already deleting logs on this worker node. Skipping it!
If you believe you're receiving this message in error, kindly check if /tmp/airflow_log_cleanup_worker.lock exists and delete it.
I can delete the file mention but still have the same issue all the time. The logs are not remove even if the DAG finish as success.
How can I fix this ?
The issue is with running the tasks on one worker. Chaining tasks on one worker could help. @gfelot try this:
from airflow.models.baseoperator import chain
for log_cleanup_id in range(1, NUMBER_OF_WORKERS + 1):
log_cleanup_op_worker_list = []
for dir_id, directory in enumerate(DIRECTORIES_TO_DELETE):
log_cleanup_op = BashOperator(
task_id='log_cleanup_worker_num_' +
str(log_cleanup_id) + '_dir_' + str(dir_id),
bash_command=log_cleanup,
params={
"directory": str(directory),
"sleep_time": int(log_cleanup_id)*3},
dag=dag)
log_cleanup_op_worker_list.append(log_cleanup_op)
chain(start, *log_cleanup_op_worker_list)