[datahub migration] Failed if there is no schema registry
Describe the bug
I am currently attempting to migrate data from datahub v0.14.1 to 1.1.0, I don't have a schema registry,But schema_registry_url is not a optional value.
To Reproduce 1、recipe.yaml
pipeline_name: datahub_source_1
datahub_api:
server: "http://datahub-datahub-gms:8080/"
token: "eyJh***TUo"
source:
type: datahub
config:
include_all_versions: false
database_connection:
scheme: "mysql+pymysql"
host_port: "prerequisites-mysql:3306"
username: "root"
password: "datahub"
database: "datahub"
kafka_connection:
bootstrap: "prerequisites-kafka:9092"
## schema_registry_url: "http://prerequisites-cp-schema-registry:8081/schema-registry/api/"
stateful_ingestion:
enabled: false
ignore_old_state: true
urn_pattern:
deny:
- ^denied.urn.*
allow:
- ^allowed.urn.*
#flags:
#set_system_metadata: false # Replicate system metadata
sink:
type: datahub-rest
config:
server: "http://10.9.27.26:31900/"
token: "eyJh***iiYk"
2、 datahub ingest run -c ./recipe.yaml
3、got error :
ERROR {datahub.ingestion.run.pipeline:739} - Pipeline Error: Ingestion pipeline raised an unexpected exception!Pipeline Error: Ingestion pipeline raised an unexpected exception!: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="HTTPConnectionPool(host='localhost', port=8080): Max retries exceeded with url: /schema-registry/api/schemas/ids/3 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f2da1048a00>: Failed to establish a new connection: [Errno 111] Connection refused'))"}
4、if I deploy a cp-schema-registry and set the schema_registry_url value :
schema_registry_url: "http://prerequisites-cp-schema-registry:8081/schema-registry/api/"
will got another error :
ERROR {datahub.ingestion.run.pipeline:739} - Pipeline Error: Ingestion pipeline raised an unexpected exception!Pipeline Error: Ingestion pipeline raised an unexpected exception!: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="HTTP 404 Not Found (HTTP status code 404, SR code 404)"}
error stack is :
|[2025-06-04 08:31:33,503] INFO {datahub.ingestion.source.datahub.datahub_source:121} - Fetching timeseries aspects from kafka
-[2025-06-04 08:31:37,070] ERROR {datahub.ingestion.run.pipeline:738} - Ingestion pipeline threw an uncaught exception
Traceback (most recent call last):
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/deserializing_consumer.py", line 110, in poll
value = self._value_deserializer(value, ctx)
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/schema_registry/avro.py", line 422, in __call__
registered_schema = self._registry.get_schema(schema_id)
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 435, in get_schema
response = self._rest_client.get('schemas/ids/{}'.format(schema_id))
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 125, in get
return self.send_request(url, method='GET', query=query)
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 175, in send_request
raise SchemaRegistryError(response.status_code,
confluent_kafka.schema_registry.error.SchemaRegistryError: HTTP 404 Not Found (HTTP status code 404, SR code 404)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/run/pipeline.py", line 462, in run
for wu in itertools.islice(
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 147, in auto_workunit_reporter
for wu in stream:
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/source/datahub/datahub_source.py", line 80, in get_workunits_internal
yield from self._get_kafka_workunits(from_offsets=state.kafka_offsets)
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/source/datahub/datahub_source.py", line 128, in _get_kafka_workunits
for i, (mcl, offset) in enumerate(mcls):
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/source/datahub/datahub_kafka_reader.py", line 70, in get_mcls
yield from self._poll_partition(stop_time)
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/source/datahub/datahub_kafka_reader.py", line 78, in _poll_partition
msg = self.consumer.poll(10)
File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/deserializing_consumer.py", line 112, in poll
raise ValueDeserializationError(exception=se, kafka_message=msg)
confluent_kafka.error.ValueDeserializationError: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="HTTP 404 Not Found (HTTP status code 404, SR code 404)"}
[2025-06-04 08:31:37,074] ERROR {datahub.ingestion.run.pipeline:739} - Pipeline Error: Ingestion pipeline raised an unexpected exception!Pipeline Error: Ingestion pipeline raised an unexpected exception!: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="HTTP 404 Not Found (HTTP status code 404, SR code 404)"}
Expected behavior
schema_registry_url can be set to none , because When I used datahub 0.13 before, it was possible to perform the migration without schema registry
Screenshots
Desktop (please complete the following information):
- OS: ubuntu
- Browser chrome
- Version 22
Additional context Add any other context about the problem here.
I deploy datahub with helm , and copy the recipe.yaml file into datahub-acryl-datahub-actions* pod to execute ingest.
I try to set schema_registry_url: "http://datahub-datahub-gms:8080/schema-registry/api/"
this time no error I got , but also no data read from source.
kubectl -n datahub exec -it datahub-acryl-datahub-actions-5457967d87-csslr -- bash
cat /tmp/recipe.migrate.yaml :
pipeline_name: datahub_source_2
datahub_api:
server: "http://datahub-datahub-gms:8080"
token: "ey**o"
source:
type: datahub
config:
include_all_versions: false
database_connection:
scheme: "mysql+pymysql"
host_port: "prerequisites-mysql:3306"
username: "root"
password: "**"
database: "datahub"
kafka_connection:
bootstrap: "prerequisites-kafka:9092"
#schema_registry_url: "http://prerequisites-cp-schema-registry:8081/schema-registry/api/"
schema_registry_url: "http://datahub-datahub-gms:8080/schema-registry/api/"
stateful_ingestion:
enabled: true
ignore_old_state: true
urn_pattern:
deny:
# Ignores all datahub metadata where the urn matches the regex
#- ^urn:li:dataFlow.*
#- ^urn:li:dataJob.*
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*
#flags:
#set_system_metadata: false # Replicate system metadata
# Here, we write to a DataHub instance
# You can also use a different sink, e.g. to write the data to a file instead
sink:
type: datahub-rest
config:
server: "http://10.9.27.26:31900"
token: "ey**Yk"
datahub ingest run -c /tmp/recipe.migrate.yaml
[2025-06-04 10:40:27,784] INFO {datahub.cli.ingest_cli:149} - DataHub CLI version: 0.14.0.5
[2025-06-04 10:40:27,897] INFO {datahub.ingestion.run.pipeline:271} - Sink configured successfully. DataHubRestEmitter: configured to talk to http://10.9.27.26:31900 with token: eyJh**********iiYk
[2025-06-04 10:40:28,130] WARNING {datahub.ingestion.source.state.stateful_ingestion_base:276} - The 'ignore_old_state' config is True. The old checkpoint state will not be provided.
[2025-06-04 10:40:28,131] INFO {datahub.ingestion.run.pipeline:295} - Source configured successfully.
[2025-06-04 10:40:28,131] INFO {datahub.cli.ingest_cli:130} - Starting metadata ingestion
-[2025-06-04 10:40:28,132] INFO {datahub.ingestion.source.datahub.datahub_source:63} - Ingesting DataHub metadata up until 2025-06-04 10:40:28.132703+00:00
[2025-06-04 10:40:28,132] INFO {datahub.ingestion.source.datahub.datahub_source:93} - Fetching database aspects starting from 1970-01-01 00:00:00+00:00
\[2025-06-04 10:42:07,399] WARNING {datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:95} - No state available to commit for DatahubIngestionCheckpointingProvider
[2025-06-04 10:42:07,399] INFO {datahub.ingestion.source.datahub.datahub_source:121} - Fetching timeseries aspects from kafka
\[2025-06-04 10:42:20,487] WARNING {datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:95} - No state available to commit for DatahubIngestionCheckpointingProvider
[2025-06-04 10:42:20,488] INFO {datahub.ingestion.run.pipeline:571} - Processing commit request for DatahubIngestionCheckpointingProvider. Commit policy = CommitPolicy.ALWAYS, has_errors=False, has_warnings=False
[2025-06-04 10:42:20,488] WARNING {datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:95} - No state available to commit for DatahubIngestionCheckpointingProvider
[2025-06-04 10:42:20,488] INFO {datahub.ingestion.run.pipeline:591} - Successfully committed changes for DatahubIngestionCheckpointingProvider.
-[2025-06-04 10:42:20,714] INFO {datahub.cli.ingest_cli:143} - Finished metadata ingestion
/
Cli report:
{'cli_version': '0.14.0.5',
'cli_entry_location': '/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/__init__.py',
'models_version': 'bundled',
'py_version': '3.10.12 (main, Jul 29 2024, 16:56:48) [GCC 11.4.0]',
'py_exec_path': '/datahub-ingestion/.venv/bin/python3',
'os_details': 'Linux-6.2.0-35-generic-x86_64-with-glibc2.35',
'mem_info': '256.31 MB',
'peak_memory_usage': '256.31 MB',
'disk_info': {'total': '15.21 TB', 'used': '32.39 GB', 'used_initally': '32.39 GB', 'free': '15.18 TB'},
'peak_disk_usage': '32.39 GB',
'thread_count': 2,
'peak_thread_count': 3}
Source (datahub) report:
{'events_produced': 0,
'events_produced_per_sec': 0,
'entities': {},
'aspects': {},
'aspect_urn_samples': {},
'stop_time': '2025-06-04 10:40:28.132703+00:00 (1 minute and 52.89 seconds ago)',
'num_database_aspects_ingested': 0,
'num_database_parse_errors': 0,
'database_parse_errors': {},
'num_kafka_aspects_ingested': 0,
'num_kafka_parse_errors': 0,
'kafka_parse_errors': {},
'num_timeseries_deletions_dropped': 0,
'start_time': '2025-06-04 10:40:28.131217 (1 minute and 52.89 seconds ago)',
'running_time': '1 minute and 52.89 seconds',
'failures': [],
'warnings': [],
'infos': []}
Sink (datahub-rest) report:
{'total_records_written': 3,
'records_written_per_second': 0,
'warnings': [],
'failures': [],
'start_time': '2025-06-04 10:40:27.842728 (1 minute and 53.18 seconds ago)',
'current_time': '2025-06-04 10:42:21.019704 (now)',
'total_duration_in_seconds': 113.18,
'max_threads': 15,
'gms_version': 'v1.1.0',
'pending_requests': 0,
'main_thread_blocking_timer': '0.224 seconds'}
Pipeline finished successfully; produced 3 events in 1 minute and 52.89 seconds.
The final DataHub migration was achieved by migrating the database and rebuilding the Elasticsearch indexes.
@lordk911, could you please the reasoning behind migrating the data from one version to another and not taking the approach of running datahub upgrade module(latest version) which takes care of updating the schema in persistent store and as well as updating the indexes in elastic ?