OpenMetadata
OpenMetadata copied to clipboard
Fail to run flink lineage pipeline
Affected module Flink Ingestion Framework
Describe the bug Failed to run flink pipeline with error message :
[2025-06-09T05:23:07.108+0000] {server_mixin.py:74} INFO - OpenMetadata client running with Server version [1.7.1] and Client version [1.7.1.4] [2025-06-09T05:23:07.324+0000] {ingestion_pipeline_mixin.py:53} DEBUG - Created Pipeline Status for pipeline testflink.210e123c-1426-4a22-963e-d842898f7210: runId='01829f33-deab-4e30-b396-5450a7b709b1' pipelineState=<PipelineState.running: 'running'> startDate=Timestamp(root=1749446587087) timestamp=Timestamp(root=1749446587087) endDate=None status=None config={} [2025-06-09T05:23:07.325+0000] {importer.py:129} DEBUG - Importing: metadata.ingestion.source.pipeline.flink.service_spec.ServiceSpec [2025-06-09T05:23:07.439+0000] {importer.py:129} DEBUG - Importing: None [2025-06-09T05:23:07.440+0000] {taskinstance.py:3313} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable return ExecutionCallableRunner( File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 424, in wrapper return func(self, *args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 238, in execute return_value = self.execute_callable() File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 256, in execute_callable return runner.run(*self.op_args, **self.op_kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/openmetadata_managed_apis/workflows/ingestion/common.py", line 224, in metadata_ingestion_workflow workflow = MetadataWorkflow.create(config) File "/home/airflow/.local/lib/python3.10/site-packages/metadata/workflow/ingestion.py", line 103, in create return cls(config) File "/home/airflow/.local/lib/python3.10/site-packages/metadata/workflow/ingestion.py", line 80, in init
To Reproduce
Screenshots or steps to reproduce
Expected behavior A clear and concise description of what you expected to happen.
Version:
- OS: [e.g. CentOS 7.9]
- Python version: 3.10
- OpenMetadata version: [e.g. 1.7.1]
- OpenMetadata Ingestion package version: [e.g.
openmetadata-ingestion[docker]==1.7.1]
Additional context Add any other context about the problem here.
56c278bffa17 INFO - ::group::Log message source details *** Found local files: *** * /opt/airflow/logs/dag_id=210e123c-1426-4a22-963e-d842898f7210/run_id=manual__2025-06-09T05:23:03+00:00/task_id=lineage_task/attempt=1.log INFO - ::endgroup:: [2025-06-09T05:23:06.527+0000] {local_task_job_runner.py:123} INFO - ::group::Pre task execution logs [2025-06-09T05:23:06.567+0000] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: 210e123c-1426-4a22-963e-d842898f7210.lineage_task manual__2025-06-09T05:23:03+00:00 [queued]> [2025-06-09T05:23:06.577+0000] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: 210e123c-1426-4a22-963e-d842898f7210.lineage_task manual__2025-06-09T05:23:03+00:00 [queued]> [2025-06-09T05:23:06.578+0000] {taskinstance.py:2867} INFO - Starting attempt 1 of 1 [2025-06-09T05:23:06.598+0000] {taskinstance.py:2890} INFO - Executing <Task(CustomPythonOperator): lineage_task> on 2025-06-09 05:23:03+00:00 [2025-06-09T05:23:06.625+0000] {standard_task_runner.py:72} INFO - Started process 37208 to run task [2025-06-09T05:23:06.630+0000] {standard_task_runner.py:104} INFO - Running: ['airflow', 'tasks', 'run', '210e123c-1426-4a22-963e-d842898f7210', 'lineage_task', 'manual__2025-06-09T05:23:03+00:00', '--job-id', '12', '--raw', '--subdir', 'DAGS_FOLDER/210e123c-1426-4a22-963e-d842898f7210.py', '--cfg-path', '/tmp/tmp4gi8plb_'] [2025-06-09T05:23:06.631+0000] {standard_task_runner.py:105} INFO - Job 12: Subtask lineage_task [2025-06-09T05:23:06.787+0000] {task_command.py:467} INFO - Running <TaskInstance: 210e123c-1426-4a22-963e-d842898f7210.lineage_task manual__2025-06-09T05:23:03+00:00 [running]> on host 56c278bffa17 [2025-06-09T05:23:07.043+0000] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='210e123c-1426-4a22-963e-d842898f7210' AIRFLOW_CTX_TASK_ID='lineage_task' AIRFLOW_CTX_EXECUTION_DATE='2025-06-09T05:23:03+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-06-09T05:23:03+00:00' [2025-06-09T05:23:07.046+0000] {taskinstance.py:732} INFO - ::endgroup:: [2025-06-09T05:23:07.108+0000] {server_mixin.py:74} INFO - OpenMetadata client running with Server version [1.7.1] and Client version [1.7.1.4] [2025-06-09T05:23:07.324+0000] {ingestion_pipeline_mixin.py:53} DEBUG - Created Pipeline Status for pipeline testflink.210e123c-1426-4a22-963e-d842898f7210: runId='01829f33-deab-4e30-b396-5450a7b709b1' pipelineState=<PipelineState.running: 'running'> startDate=Timestamp(root=1749446587087) timestamp=Timestamp(root=1749446587087) endDate=None status=None config={} [2025-06-09T05:23:07.325+0000] {importer.py:129} DEBUG - Importing: metadata.ingestion.source.pipeline.flink.service_spec.ServiceSpec [2025-06-09T05:23:07.439+0000] {importer.py:129} DEBUG - Importing: None [2025-06-09T05:23:07.440+0000] {taskinstance.py:3313} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 768, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable return ExecutionCallableRunner( File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 424, in wrapper return func(self, *args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 238, in execute return_value = self.execute_callable() File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 256, in execute_callable return runner.run(*self.op_args, **self.op_kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/openmetadata_managed_apis/workflows/ingestion/common.py", line 224, in metadata_ingestion_workflow workflow = MetadataWorkflow.create(config) File "/home/airflow/.local/lib/python3.10/site-packages/metadata/workflow/ingestion.py", line 103, in create return cls(config) File "/home/airflow/.local/lib/python3.10/site-packages/metadata/workflow/ingestion.py", line 80, in init
Flink Version: 1.19.1 Run in yarn per session detached