airflow
airflow copied to clipboard
Copy of [AIRFLOW-5071] JIRA: Thousands of Executor reports task instance X finished (success) although the task says its queued. Was the task killed externally?
Apache Airflow version: 1.10.9
Kubernetes version (if you are using kubernetes) (use kubectl version
): Server: v1.10.13, Client: v1.17.0
Environment:
- Cloud provider or hardware configuration: AWS
- OS (e.g. from /etc/os-release): Debian GNU/Linux 9 (stretch)
-
Kernel (e.g.
uname -a
):Linux airflow-web-54fc4fb694-ftkp5 4.19.123-coreos #1 SMP Fri May 22 19:21:11 -00 2020 x86_64 GNU/Linux
- Others: Redis, CeleryExecutor
What happened:
In line with the guidelines laid out in AIRFLOW-7120, I'm copying over a JIRA for a bug that has significant negative impact on our pipeline SLAs. The original ticket is AIRFLOW-5071 which has a lot of details from various users who use ExternalTaskSensors in reschedule mode and see their tasks going through the following unexpected state transitions:
running -> up_for_reschedule -> scheduled -> queued -> up_for_retry
In our case, this issue seems to affect approximately ~2000 tasks per day.
data:image/s3,"s3://crabby-images/e5783/e57830272a9b428380e1c6ee6b8c388ae2021911" alt="Screenshot 2020-09-08 at 09 01 03"
What you expected to happen:
I would expect that tasks would go through the following state transitions instead: running -> up_for_reschedule -> scheduled -> queued -> running
How to reproduce it:
Unfortunately, I don't have configuration available that could be used to easily reproduce the issue at the moment. However, based on the thread in AIRFLOW-5071, the problem seems to arise in deployments that use a large number of sensors in reschedule mode.
Thanks for opening your first issue here! Be sure to follow the issue template!
Thanks @dmariassy for bringing this issue to Github! I think this one is quite important to fix but as long as we don't know how to replicate it we are going blind.
I spent some time trying to reproduce it on 2.0 and 1.10.9 but to no effect :<
Thanks for your reply @turbaszek . What did your reproduction set-up look like? If I have the time, I would like to have a go at trying to reproduce it myself in the coming weeks.
As it was reported in original issue and comments this behavior should be possible to reproduce in case of fast sensors in reschedule mode. That's why I was trying to use many DAGs like this:
from random import choice
from airflow.utils.dates import days_ago
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
import time
class TestSensor(BaseSensorOperator):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.mode = "RESCHEDULE"
def poke(self, context):
time.sleep(5)
return choice([True, False, False])
args = {"owner": "airflow", "start_date": days_ago(1)}
with DAG(
dag_id="%s",
is_paused_upon_creation=False,
max_active_runs=100,
default_args=args,
schedule_interval="0 * * * *",
) as dag:
start = BashOperator(task_id="start", bash_command="echo 42")
end = BashOperator(task_id="end", bash_command="echo 42")
for i in range(3):
next = TestSensor(task_id=f"next_{i}")
start >> next >> end
And I was also playing with airflow config settings as described in comments. Although I saw failing tasks there was no issue like this one or... eventually the log was missing?
I did some tests with external task sensor but also no results.
Hi @turbaszek, any finding on this? We have a CeleryExecutor + Redis setup with three workers (apache-airflow 1.10.12). The airflow-scheduler log has a lot of lines like this. I remember this was already a problem when we were using older versions such as 1.10.10. It's just we never paid much attention to it.
{taskinstance.py:1150} ERROR - Executor reports task instance <TaskInstance: ... [queued]> finished (success) although the task says its queued. Was the task killed externally?
Same with others in this thread, we have a lot of sensors in "reschedule" mode with poke_interval
set to 60s. These are the ones that most often hit this error. So far our workaround has been to add a retries=3
to these sensors. That way when this error happens it retries and we don't get any spam. This is definitely not a great long term solution though. Such sensors go into up_for_retry
state when this happen.
I also tried to tweak these parameters. They don't seem to matter much as far as this error is concerned:
parallelism = 1024
dag_concurrency = 128
max_threads = 8
The way to reproduce this issue seems to be to create a DAG with a bunch of parallel reschedule
sensors. And make the DAG slow to import. For example, like this. If we add a time.sleep(30)
at the end to simulate the experience of slow-to-import DAGs, this error happens a lot for such sensors. You may also need to tweak the dagbag_import_timeout
and dag_file_processor_timeout
if adding the sleep
causes dags to fail to import altogether.
When the scheduler starts to process this DAG, we then start to see the above error happening to these sensors. And the go into up_for_retry
.
import datetime
import pendulum
import time
from airflow.models.dag import DAG
from airflow.contrib.sensors.python_sensor import PythonSensor
with DAG(
dag_id="test_dag_slow",
start_date=datetime.datetime(2020, 9, 8),
schedule_interval="@daily",
) as dag:
sensors = [
PythonSensor(
task_id=f"sensor_{i}",
python_callable=lambda: False,
mode="reschedule",
retries=2,
) for i in range(20)
]
time.sleep(30)
@yuqian90 thanks you so much for pointing to the DAG! I will check it and let you know. Once we can replicate the problem it will be much more easier to solve it 👍
@yuqian90
I also tried to tweak these parameters. They don't seem to matter much as far as this error is concerned:
parallelism = 1024 dag_concurrency = 128 max_threads = 8
The way to reproduce this issue seems to be to create a DAG with a bunch of parallel
reschedule
sensors. And make the DAG slow to import. For example, like this. If we add atime.sleep(30)
at the end to simulate the experience of slow-to-import DAGs, this error happens a lot for such sensors. You may also need to tweak thedagbag_import_timeout
anddag_file_processor_timeout
if adding thesleep
causes dags to fail to import altogether.
Those parameters won't help you much. I was struggling to somehow workaround this issue and I believe I've found the right solution now. In my case the biggest hint while debugging was not scheduler/worker logs but the Celery Flower Web UI. We have a setup of 3 Celery workers, 4 CPU each. It often happened that Celery was running 8 or more python reschedule sensors on one worker but 0 on the others and that was the exact time when sensors started to fail. There are two Celery settings that are responsible for this behavior: worker_concurrency
with a default value of "16" and worker_autoscale
with a default value of "16,12" (it basically means that minimum Celery process # on the worker is 12 and can be scaled up to 16). With those set with default values Celery was configured to load up to 16 tasks (mainly reschedule sensors) to one node. After setting worker_concurrency
to match the CPU number and worker_autoscale
to "4,2" the problem is literally gone. Maybe that might be anothe clue for @turbaszek.
I've been trying a lot to setup a local docker compose file with scheduler, webserver, flower, postgres and RabbitMQ as a Celery backend but I was not able to replicate the issue as well. I tried to start a worker container with limited CPU to somehow imitate this situation, but I failed. There are in fact tasks killed and shown as failed in Celery Flower, but not with the "killed externally" reason.
@sgrzemski-ias I will setup an environment to first observe the behavior and then if it will occur I will check your suggestion! Hope that we will be able to understand what's going on here 🚀
Ok @yuqian90 @sgrzemski-ias what is you setting for core.dagbag_import_timeout ?
As I'm hitting:
Traceback (most recent call last): File "/usr/local/lib/airflow/airflow/models/dagbag.py", line 237, in process_file m = imp.load_source(mod_name, filepath) File "/opt/python3.6/lib/python3.6/imp.py", line 172, in load_source module = _load(spec) File "
Ok @yuqian90 @sgrzemski-ias what is you setting for core.dagbag_import_timeout ?
As I'm hitting:
Traceback (most recent call last): File "/usr/local/lib/airflow/airflow/models/dagbag.py", line 237, in process_file m = imp.load_source(mod_name, filepath) File "/opt/python3.6/lib/python3.6/imp.py", line 172, in load_source module = _load(spec) File "", line 684, in _load File "", line 665, in _load_unlocked File "", line 678, in exec_module File "", line 219, in _call_with_frames_removed File "/home/airflow/gcs/dags/test_dag_1.py", line 24, in time.sleep(30) File "/usr/local/lib/airflow/airflow/utils/timeout.py", line 43, in handle_timeout raise AirflowTaskTimeout(self.error_message) airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 6217
Hi, @turbaszek in my case I have dagbag_import_timeout = 100
and dag_file_processor_timeout = 300
. Most of the time dag import takes about 10s. dag file processing can take 60s that's why it's set to a large number.
After digging further, I think the slowness that causes the error for our case is in this function: SchedulerJob._process_dags()
. If this function takes around 60s, those reschedule
sensors will hit the ERROR - Executor reports task instance ... killed externally?
error. My previous comment about adding the time.sleep(30)
is just one way to replicate this issue. Anything that causes _process_dags()
to slow down should be able to replicate this error.
Here's another potential hint: We have increased the poke_interval
value for a subset of our sensors yesterday to 5 minutes (from the default 1 minute), and the issue seems to have disappeared for the affected sensors.
I can confirm that one of our customers also faced a similar issue with poke='reschedule' and increasing poke_interval had fixed the issue for them.
It feels some sort of race condition.
We are on Airflow 1.10.10
Besides the DAGs which have sensor tasks in them, we are even encountering this in tasks which have no sensors in them, for example a DAG which only has PythonOperator and HiveOperator in it. Also, weirdly not all dags, even with similar signatures (in terms of operators being used) are not affected, but ones that are, are severely affected and keep getting: ERROR - Executor reports task instance <TaskInstance: task_id 2020-09-11 00:00:00+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
After digging further, I think the slowness that causes the error for our case is in this function:
SchedulerJob._process_dags()
. If this function takes around 60s, thosereschedule
sensors will hit theERROR - Executor reports task instance ... killed externally?
error. My previous comment about adding thetime.sleep(30)
is just one way to replicate this issue. Anything that causes_process_dags()
to slow down should be able to replicate this error.
Some further investigation shows that the slow down that caused this issue for our case (Airflow 1.10.12) was in SchedulerJob._process_task_instances
. This is periodically called in the DagFileProcessor
process spawned by the airflow scheduler. Anything that causes this function to take more than 60s seems to cause these ERROR - Executor reports task instance ... killed externally?
errors for sensors in reschedule
mode with poke_interval
of 60s. I'm trying to address one of the cause of the SchedulerJob._process_task_instances
slowdown for our own case here #11010, but that's not a fix for the other causes of this same error.
We have just introduced ExternalTaskSensor into our pipeline and faced the same issue. When initially tested on our dev instance (~200 DAGs) it worked fine, after running it on our prod environment (~400 DAGs) it was always failing after reschedule.
After digging into the code, it looks that this is simply race condition in the scheduler.
We have child_dag.parent_dag_completed task that waits for business process to complete calculations in parent_dag, task execution logs:
[2020-10-01 11:48:03,038] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 [queued]>
[2020-10-01 11:48:03,065] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 [queued]>
[2020-10-01 11:48:03,066] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-10-01 11:48:03,066] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-10-01 11:48:03,066] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-10-01 11:48:03,095] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): parent_dag_completed> on 2020-09-30T11:45:00+00:00
[2020-10-01 11:48:03,100] {standard_task_runner.py:53} INFO - Started process 26131 to run task
[2020-10-01 11:48:03,235] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 [running]> ip-10-200-100-113.eu-west-1.compute.internal
[2020-10-01 11:48:03,318] {external_task_sensor.py:117} INFO - Poking for parent_dag on 2020-09-30T11:45:00+00:00 ...
[2020-10-01 11:48:03,397] {taskinstance.py:1136} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2020-10-01 11:48:12,994] {logging_mixin.py:112} INFO - [2020-10-01 11:48:12,993] {local_task_job.py:103} INFO - Task exited with return code 0
[2020-10-01 11:50:53,744] {taskinstance.py:663} INFO - Dependencies not met for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run.
[2020-10-01 11:50:53,747] {logging_mixin.py:112} INFO - [2020-10-01 11:50:53,747] {local_task_job.py:91} INFO - Task is not able to be run
Scheduler logs:
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [scheduled]>
[2020-10-01 11:47:59,428] {scheduler_job.py:1010} INFO - DAG child_dag has 0/16 running and queued tasks
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [scheduled]>
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [queued]>
[2020-10-01 11:47:59,565] {scheduler_job.py:1170} INFO - Sending ('child_dag', 'parent_dag_completed', datetime.datetime(2020, 9, 30, 11, 45, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2020-10-01 11:47:59,565] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'child_dag', 'parent_dag_completed', '2020-09-30T11:45:00+00:00', '--local', '--pool', 'default_pool', '-sd', '/usr/local/airflow/dags/291a327d-5d46-4cf5-87cf-4bad036f56fa_1.py']
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [scheduled]>
[2020-10-01 11:50:50,118] {scheduler_job.py:1010} INFO - DAG child_dag has 0/16 running and queued tasks
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [scheduled]>
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [queued]>
[2020-10-01 11:50:50,148] {scheduler_job.py:1170} INFO - Sending ('child_dag', 'parent_dag_completed', datetime.datetime(2020, 9, 30, 11, 45, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2020-10-01 11:50:50,148] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'child_dag', 'parent_dag_completed', '2020-09-30T11:45:00+00:00', '--local', '--pool', 'default_pool', '-sd', '/usr/local/airflow/dags/291a327d-5d46-4cf5-87cf-4bad036f56fa_1.py']
[2020-10-01 11:50:50,595] {scheduler_job.py:1313} INFO - Executor reports execution of child_dag.parent_dag_completed execution_date=2020-09-30 11:45:00+00:00 exited with status success for try_number 1
[2020-10-01 11:50:50,599] {scheduler_job.py:1330} ERROR - Executor reports task instance <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [queued]> finished (success) although the task says its queued. Was the task killed externally?
[2020-10-01 11:50:50,803] {taskinstance.py:1145} ERROR - Executor reports task instance <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [queued]> finished (success) although the task says its queued. Was the task killed externally?
[2020-10-01 11:50:50,804] {taskinstance.py:1202} INFO - Marking task as FAILED.dag_id=child_dag, task_id=parent_dag_completed, execution_date=20200930T114500, start_date=20201001T114803, end_date=20201001T115050
From scheduler log it's visible that event from executor is processed after task is already queued for the second time.
Logic related to those logs is here:
def _validate_and_run_task_instances(self, simple_dag_bag):
if len(simple_dag_bag.simple_dags) > 0:
try:
self._process_and_execute_tasks(simple_dag_bag) # <-- task state is changed to queued here
except Exception as e:
self.log.error("Error queuing tasks")
self.log.exception(e)
return False
# Call heartbeats
self.log.debug("Heartbeating the executor")
self.executor.heartbeat()
self._change_state_for_tasks_failed_to_execute()
# Process events from the executor
self._process_executor_events(simple_dag_bag) # <-- notification of previous execution is processed and there is state mismatch
return True
This is the place where task state is changes:
def _process_executor_events(self, simple_dag_bag, session=None):
# ...
if ti.try_number == try_number and ti.state == State.QUEUED:
msg = ("Executor reports task instance {} finished ({}) "
"although the task says its {}. Was the task "
"killed externally?".format(ti, state, ti.state))
Stats.incr('scheduler.tasks.killed_externally')
self.log.error(msg)
try:
simple_dag = simple_dag_bag.get_dag(dag_id)
dagbag = models.DagBag(simple_dag.full_filepath)
dag = dagbag.get_dag(dag_id)
ti.task = dag.get_task(task_id)
ti.handle_failure(msg)
except Exception:
self.log.error("Cannot load the dag bag to handle failure for %s"
". Setting task to FAILED without callbacks or "
"retries. Do you have enough resources?", ti)
ti.state = State.FAILED
session.merge(ti)
session.commit()
Unfortunately I think that moving _process_executor_events before _process_and_execute_tasks would not solve the issue as event might arrive from executor while _process_and_execute_tasks is executing. Increasing poke_interval reduces chance of this race condition happening when scheduler is under a heavy load.
I'm not too familiar with Airflow code base, but it seems that the root cause is the way how reschedule works and the fact that try_number is not changing. Because of that scheduler thinks that event for past execution is for the ongoing one.
The cause is clear as @rafalkozik mentioned. After scheduler schedule the task at the second time(put it in queue) and then it start process the executor events of the task's first-try. It occurs when the scheduling loop time > sensor task reschedule interval. Either reducing the scheduler looping time(dag processing time, etc) or increasing the sensor task reschedule interval will work.
The bug can also be fixed if the rescheduled task instance use a different try number, but this will cause a lot of log files.
def _process_executor_events(self, simple_dag_bag, session=None):
# ...
if ti.try_number == try_number and ti.state == State.QUEUED: # <-- try number for a sensor task is always the same
msg = ("Executor reports task instance {} finished ({}) "
"although the task says its {}. Was the task "
"killed externally?".format(ti, state, ti.state))
Stats.incr('scheduler.tasks.killed_externally')
self.log.error(msg)
try:
simple_dag = simple_dag_bag.get_dag(dag_id)
dagbag = models.DagBag(simple_dag.full_filepath)
dag = dagbag.get_dag(dag_id)
ti.task = dag.get_task(task_id)
ti.handle_failure(msg)
except Exception:
self.log.error("Cannot load the dag bag to handle failure for %s"
". Setting task to FAILED without callbacks or "
"retries. Do you have enough resources?", ti)
ti.state = State.FAILED
session.merge(ti)
session.commit()
The bug can also be fixed if the rescheduled task instance use a different try number, but this will cause a lot of log files.
I saw customers doing this (custom fork). I'm curious if this error will occur in Airflow 2.0
The bug can also be fixed if the rescheduled task instance use a different try number, but this will cause a lot of log files.
I saw customers doing this (custom fork). I'm curious if this error will occur in Airflow 2.0
Hi @turbaszek I did not test this in Airflow 2.0 so I may be wrong. I don't see any attempts to address this in Airflow 2.0 so this is likely going to happen in 2.0 too. That said, the scheduler loop is faster in Airflow 2.0, the chance of running into this ERROR - Executor reports task instance ... killed externally
issue should become smaller.
@turbaszek I am currently testing Airflow v2.0.0b3 against the same DAGS we currently run on production against 1.10.12 and I can confirm that this is still an issue.
Combined with #12552 it makes the problem even worse too.
To add some further context, I can consistently replicate this error on 2.0.0b3 on a very simple environment running two Docker containers - webserver and postgres - on a Python 3.7 image using LocalExecutor and with a poke_interval
of 60 * 5
.
[2020-12-03 11:52:04,649] {scheduler_job.py:946} INFO - 1 tasks up for execution:
<TaskInstance: target_dag.wait-task 2020-12-02 00:00:00+00:00 [scheduled]>
[2020-12-03 11:52:04,655] {scheduler_job.py:980} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
[2020-12-03 11:52:04,656] {scheduler_job.py:1007} INFO - DAG target_dag has 0/16 running and queued tasks
[2020-12-03 11:52:04,657] {scheduler_job.py:1068} INFO - Setting the following tasks to queued state:
<TaskInstance: target_dag.wait-task 2020-12-02 00:00:00+00:00 [scheduled]>
[2020-12-03 11:52:04,661] {scheduler_job.py:1110} INFO - Sending TaskInstanceKey(dag_id='target_dag', task_id='wait-task', execution_date=datetime.datetime(2020, 12, 2, 0, 0, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 1 and queue default
[2020-12-03 11:52:04,663] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'target_dag', 'wait-task', '2020-12-02T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/airflow/dags/target_dag.py']
[2020-12-03 11:52:04,675] {local_executor.py:80} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'target_dag', 'wait-task', '2020-12-02T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/airflow/dags/target_dag.py']
[2020-12-03 11:52:04,710] {dagbag.py:440} INFO - Filling up the DagBag from /usr/local/airflow/dags/target_dag.py
Running <TaskInstance: target_dag.wait-task 2020-12-02T00:00:00+00:00 [queued]> on host 5acdea444946
[2020-12-03 11:52:05 +0000] [568] [INFO] Handling signal: ttin
[2020-12-03 11:52:05 +0000] [11260] [INFO] Booting worker with pid: 11260
[2020-12-03 11:52:05,776] {scheduler_job.py:946} INFO - 1 tasks up for execution:
<TaskInstance: target_dag.wait-task 2020-12-02 00:00:00+00:00 [scheduled]>
[2020-12-03 11:52:05,783] {scheduler_job.py:980} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
[2020-12-03 11:52:05,783] {scheduler_job.py:1007} INFO - DAG target_dag has 0/16 running and queued tasks
[2020-12-03 11:52:05,783] {scheduler_job.py:1068} INFO - Setting the following tasks to queued state:
<TaskInstance: target_dag.wait-task 2020-12-02 00:00:00+00:00 [scheduled]>
[2020-12-03 11:52:05,791] {scheduler_job.py:1110} INFO - Sending TaskInstanceKey(dag_id='target_dag', task_id='wait-task', execution_date=datetime.datetime(2020, 12, 2, 0, 0, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 1 and queue default
[2020-12-03 11:52:05,791] {base_executor.py:82} ERROR - could not queue task TaskInstanceKey(dag_id='target_dag', task_id='wait-task', execution_date=datetime.datetime(2020, 12, 2, 0, 0, tzinfo=Timezone('UTC')), try_number=1)
[2020-12-03 11:52:05,797] {scheduler_job.py:1208} INFO - Executor reports execution of target_dag.wait-task execution_date=2020-12-02 00:00:00+00:00 exited with status success for try_number 1
[2020-12-03 11:52:05,808] {scheduler_job.py:1237} ERROR - Executor reports task instance <TaskInstance: target_dag.wait-task 2020-12-02 00:00:00+00:00 [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?
from airflow import models
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2018, 10, 31),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag_name = 'target_dag'
with models.DAG(dag_name,
default_args=default_args,
schedule_interval='0 0 * * *',
catchup=False,
max_active_runs=5
) as dag:
wait = ExternalTaskSensor(
task_id='wait-task',
external_dag_id='master_dag',
external_task_id='start',
poke_interval=60 * 5,
mode='reschedule'
)
Not sure if this is relevant but, when the task was rescheduled five minutes later, I saw this.
[2020-12-03 11:57:07,266] {scheduler_job.py:1237} ERROR - Executor reports task instance <TaskInstance: target_dag.wait-task 2020-12-02 00:00:00+00:00 [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?
Process ForkProcess-34:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 365, in _run_processor_manager
processor_manager.start()
File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 596, in start
return self._run_parsing_loop()
File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 659, in _run_parsing_loop
self._processors.pop(processor.file_path)
KeyError: '/usr/local/airflow/dags/target_dag.py'
[2020-12-03 11:57:09,101] {dag_processing.py:399} WARNING - DagFileProcessorManager (PID=157) exited with exit code 1 - re-launching
[2020-12-03 11:57:09,105] {dag_processing.py:250} INFO - Launched DagFileProcessorManager with pid: 33432
Not sure if this is relevant but, when the task was rescheduled five minutes later, I saw this.
I saw this also from time to time but not always so probably not related.
@nathadfield @yuqian90 and others, have you been able to test 2.0? Have you observed this issue?
@turbaszek I just tried it again and I couldn't replicate this error again on 2.0.
Hello. I am using airflow 2.0 and just ran into this error.
How can I fix it??
@turbaszek - Is this error is fixed in 2.0 ?
@turbaszek - Is this error is fixed in 2.0 ?
From what @yougyoung94 reports it seems to occur still
I am using airflow 2.0.1 and have the same error too. ERROR - Executor reports task instance <TaskInstance: logConverterDag.logConverterDagspark_job 2021-03-16 22:05:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: Celery command failed on host: hadoop*) Was the task killed externally? It happens every day randomly on different dags
Trying Airflow 2.0.1. No tasks could be executed :(
scheduler_1 | [2021-03-26 16:25:55,097] {{scheduler_job.py:941}} INFO - 1 tasks up for execution:
scheduler_1 | <TaskInstance: test_dag.mirrors_to_vaniks 2021-03-26 16:25:55.009735+00:00 [scheduled]>
scheduler_1 | [2021-03-26 16:25:55,100] {{scheduler_job.py:975}} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
scheduler_1 | [2021-03-26 16:25:55,100] {{scheduler_job.py:1002}} INFO - DAG test_dag has 0/16 running and queued tasks
scheduler_1 | [2021-03-26 16:25:55,100] {{scheduler_job.py:1063}} INFO - Setting the following tasks to queued state:
scheduler_1 | <TaskInstance: test_dag.mirrors_to_vaniks 2021-03-26 16:25:55.009735+00:00 [scheduled]>
scheduler_1 | [2021-03-26 16:25:55,103] {{scheduler_job.py:1105}} INFO - Sending TaskInstanceKey(dag_id='test_dag', task_id='mirrors_to_vaniks', execution_date=datetime.datetime(2021, 3, 26, 16, 25, 55, 9735, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 1 and queue default
scheduler_1 | [2021-03-26 16:25:55,104] {{base_executor.py:82}} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_dag', 'mirrors_to_vaniks', '2021-03-26T16:25:55.009735+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/airflow/dags/mwl/test_dag.py']
scheduler_1 | [2021-03-26 16:25:55,149] {{scheduler_job.py:1206}} INFO - Executor reports execution of test_dag.mirrors_to_vaniks execution_date=2021-03-26 16:25:55.009735+00:00 exited with status queued for try_number 1
scheduler_1 | [2021-03-26 16:25:55,154] {{scheduler_job.py:1226}} INFO - Setting external_id for <TaskInstance: test_dag.mirrors_to_vaniks 2021-03-26 16:25:55.009735+00:00 [queued]> to 53fa2dc3-9f17-4813-a1bc-7e28f18e0ddd
Same here with Airflow 2.0.1