datahub icon indicating copy to clipboard operation
datahub copied to clipboard

Airflow plugin don't support SmartSensor(ExternalTaskSensor)

Open JJJzheng opened this issue 2 years ago • 4 comments

I installed acryl-datahub-airflow-plugin to use datahub-rest to access with datahub.

And I use ExternalTaskSensor as a SmartSensor in my code. But I found that SmartSensor does not work during execution. See log:

[2022-11-28, 10:41:09 UTC] {taskinstance.py:1271} INFO - Executing <Task(ExternalTaskSensor): sensor_partition_drift_test> on 2022-11-27 01:00:00+00:00
[2022-11-28, 10:41:09 UTC] {standard_task_runner.py:52} INFO - Started process 4842 to run task
[2022-11-28, 10:41:09 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'drift_test', 'sensor_partition_drift_test', 'scheduled__2022-11-27T01:00:00+00:00', '--job-id', '214815', '--raw', '--subdir', 'DAGS_FOLDER/my_task.py', '--cfg-path', '/tmp/tmpayqixqle', '--error-file', '/tmp/tmp8ffq51vz']
[2022-11-28, 10:41:09 UTC] {standard_task_runner.py:80} INFO - Job 214815: Subtask sensor_partition_drift_test
[2022-11-28, 10:41:10 UTC] {task_command.py:298} INFO - Running <TaskInstance: drift_test.sensor_partition_drift_test scheduled__2022-11-27T01:00:00+00:00 [running]> on host xxxxx
[2022-11-28, 10:41:10 UTC] {taskinstance.py:1448} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=xxx
AIRFLOW_CTX_DAG_ID=drift_test
AIRFLOW_CTX_TASK_ID=sensor_partition_drift_test
AIRFLOW_CTX_EXECUTION_DATE=2022-11-27T01:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-11-27T01:00:00+00:00
[2022-11-28, 10:41:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ... 
[2022-11-28, 10:42:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ... 
[2022-11-28, 10:43:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ... 
[2022-11-28, 10:44:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ... 
[2022-11-28, 10:45:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ... 
[2022-11-28, 10:46:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ... 
[2022-11-28, 10:47:10 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ... 
[2022-11-28, 10:48:11 UTC] {external_task.py:171} INFO - Poking for tasks None in dag partition_drift_test on 2022-11-27T01:00:00+00:00 ... 

Apparently, the sensor is in a dead loop: poking.

My ExternalTaskSensor implementation class(Pseudocode):

class ExternalTaskSmartSensor(ExternalTaskSensor):

    def __init__(self, execution_date=None, **kwargs):
        super().__init__(**kwargs)

    # In fact, this method is not executed at all
    def get_poke_context(self, context):
        return {
            'external_dag_id': self.external_dag_id,
            'external_task_id': self.external_task_id,
            'timeout': self.timeout,
            'check_existence': self.check_existence,
            'execution_date': context['execution_date'].isoformat(),
        }

    @provide_session
    def poke(self, context, session=None):
        return super().poke(
            {
                **context,
                'execution_date': datetime.datetime.fromisoformat(
                    context['execution_date']
                ),
            },
            session,
        )

Expected behavior My dag should register in sensor service, but from the logs, the dag is not submitted to the sensor, and I can't find the corresponding dag in the smart_sensor_group_shard.

Version:

  • Airflow version [2.2.5]
  • acryl-datahub-airflow-plugin [0.9.2.4]

JJJzheng avatar Nov 28 '22 12:11 JJJzheng

By the way, SmartSensor works fine without the plugin installed, and I didn't find any incompatibility between this two in the official documentation.

JJJzheng avatar Nov 28 '22 12:11 JJJzheng

@JJJzheng we need to check this, we weren't aware of incompatibility with Smart Sensors. FYI, are you aware of that Smart Sensors are deprecated in Airflow? -> https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/smart-sensors.html

treff7es avatar Nov 28 '22 15:11 treff7es

@treff7es Yep, I know it has been deprecated in the latest version (version >= 2.4). It should be converted to use Deferrable Operators later. But I haven't found a good way to migration smooth (Dag uses a lot of external dependencies).

JJJzheng avatar Nov 29 '22 02:11 JJJzheng

@treff7es By the way, is it compatible with Deferrable Operators in datahub. If it is confirmed that SmartSensor and datahub are not compatible and there is not a fixed version, I may choose to migrate to Deferrable Operators.

Maybe I need to test the above mentioned today.

JJJzheng avatar Nov 29 '22 02:11 JJJzheng

This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io

github-actions[bot] avatar Jan 07 '23 02:01 github-actions[bot]

any process here ?

JJJzheng avatar Jan 10 '23 08:01 JJJzheng

This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io

github-actions[bot] avatar Feb 12 '23 02:02 github-actions[bot]

This issue was closed because it has been inactive for 30 days since being marked as stale.

github-actions[bot] avatar Mar 15 '23 01:03 github-actions[bot]