datahub
datahub copied to clipboard
Airflow plugin don't support SmartSensor(ExternalTaskSensor)
I installed acryl-datahub-airflow-plugin to use datahub-rest to access with datahub.
And I use ExternalTaskSensor as a SmartSensor in my code. But I found that SmartSensor does not work during execution. See log:
[2022-11-28, 10:41:09 UTC] {taskinstance.py:1271} INFO - Executing <Task(ExternalTaskSensor): sensor_partition_drift_test> on 2022-11-27 01:00:00+00:00
[2022-11-28, 10:41:09 UTC] {standard_task_runner.py:52} INFO - Started process 4842 to run task
[2022-11-28, 10:41:09 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'drift_test', 'sensor_partition_drift_test', 'scheduled__2022-11-27T01:00:00+00:00', '--job-id', '214815', '--raw', '--subdir', 'DAGS_FOLDER/my_task.py', '--cfg-path', '/tmp/tmpayqixqle', '--error-file', '/tmp/tmp8ffq51vz']
[2022-11-28, 10:41:09 UTC] {standard_task_runner.py:80} INFO - Job 214815: Subtask sensor_partition_drift_test
[2022-11-28, 10:41:10 UTC] {task_command.py:298} INFO - Running <TaskInstance: drift_test.sensor_partition_drift_test scheduled__2022-11-27T01:00:00+00:00 [running]> on host xxxxx
[2022-11-28, 10:41:10 UTC] {taskinstance.py:1448} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=xxx
AIRFLOW_CTX_DAG_ID=drift_test
AIRFLOW_CTX_TASK_ID=sensor_partition_drift_test
AIRFLOW_CTX_EXECUTION_DATE=2022-11-27T01:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-11-27T01:00:00+00:00
[2022-11-28, 10:41:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ...
[2022-11-28, 10:42:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ...
[2022-11-28, 10:43:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ...
[2022-11-28, 10:44:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ...
[2022-11-28, 10:45:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ...
[2022-11-28, 10:46:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ...
[2022-11-28, 10:47:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ...
[2022-11-28, 10:48:11 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ...
Apparently, the sensor is in a dead loop: poking.
My ExternalTaskSensor implementation class(Pseudocode):
class ExternalTaskSmartSensor(ExternalTaskSensor):
def __init__(self, execution_date=None, **kwargs):
super().__init__(**kwargs)
# In fact, this method is not executed at all
def get_poke_context(self, context):
return {
'external_dag_id': self.external_dag_id,
'external_task_id': self.external_task_id,
'timeout': self.timeout,
'check_existence': self.check_existence,
'execution_date': context['execution_date'].isoformat(),
}
@provide_session
def poke(self, context, session=None):
return super().poke(
{
**context,
'execution_date': datetime.datetime.fromisoformat(
context['execution_date']
),
},
session,
)
Expected behavior My dag should register in sensor service, but from the logs, the dag is not submitted to the sensor, and I can't find the corresponding dag in the smart_sensor_group_shard.
Version:
- Airflow version [2.2.5]
- acryl-datahub-airflow-plugin [0.9.2.4]
By the way, SmartSensor works fine without the plugin installed, and I didn't find any incompatibility between this two in the official documentation.
@JJJzheng we need to check this, we weren't aware of incompatibility with Smart Sensors. FYI, are you aware of that Smart Sensors are deprecated in Airflow? -> https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/smart-sensors.html
@treff7es Yep, I know it has been deprecated in the latest version (version >= 2.4). It should be converted to use Deferrable Operators later. But I haven't found a good way to migration smooth (Dag uses a lot of external dependencies).
@treff7es By the way, is it compatible with Deferrable Operators in datahub. If it is confirmed that SmartSensor and datahub are not compatible and there is not a fixed version, I may choose to migrate to Deferrable Operators.
Maybe I need to test the above mentioned today.
This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io
any process here ?
This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io
This issue was closed because it has been inactive for 30 days since being marked as stale.