datahub icon indicating copy to clipboard operation
datahub copied to clipboard

[datahub migration] Failed if there is no schema registry

Open lordk911 opened this issue 6 months ago • 2 comments

Describe the bug I am currently attempting to migrate data from datahub v0.14.1 to 1.1.0, I don't have a schema registry,But schema_registry_url is not a optional value.

To Reproduce 1、recipe.yaml

pipeline_name: datahub_source_1
datahub_api:
  server: "http://datahub-datahub-gms:8080/" 
  token: "eyJh***TUo"
source:
  type: datahub
  config:
    include_all_versions: false
    database_connection:
      scheme: "mysql+pymysql" 
      host_port: "prerequisites-mysql:3306"
      username: "root"
      password: "datahub"
      database: "datahub"
    kafka_connection:
      bootstrap: "prerequisites-kafka:9092"
      ## schema_registry_url: "http://prerequisites-cp-schema-registry:8081/schema-registry/api/"
    stateful_ingestion:
      enabled: false
      ignore_old_state: true
    urn_pattern:
      deny:
        - ^denied.urn.*
      allow:
        - ^allowed.urn.*
#flags:
   #set_system_metadata: false # Replicate system metadata          
sink:
  type: datahub-rest
  config:
    server: "http://10.9.27.26:31900/"
    token: "eyJh***iiYk"

2、 datahub ingest run -c ./recipe.yaml

3、got error :

ERROR {datahub.ingestion.run.pipeline:739} - Pipeline Error: Ingestion pipeline raised an unexpected exception!Pipeline Error: Ingestion pipeline raised an unexpected exception!: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="HTTPConnectionPool(host='localhost', port=8080): Max retries exceeded with url: /schema-registry/api/schemas/ids/3 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f2da1048a00>: Failed to establish a new connection: [Errno 111] Connection refused'))"}

4、if I deploy a cp-schema-registry and set the schema_registry_url value : schema_registry_url: "http://prerequisites-cp-schema-registry:8081/schema-registry/api/"

will got another error :

ERROR {datahub.ingestion.run.pipeline:739} - Pipeline Error: Ingestion pipeline raised an unexpected exception!Pipeline Error: Ingestion pipeline raised an unexpected exception!: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="HTTP 404 Not Found (HTTP status code 404, SR code 404)"}

error stack is :

|[2025-06-04 08:31:33,503] INFO     {datahub.ingestion.source.datahub.datahub_source:121} - Fetching timeseries aspects from kafka
-[2025-06-04 08:31:37,070] ERROR    {datahub.ingestion.run.pipeline:738} - Ingestion pipeline threw an uncaught exception
Traceback (most recent call last):
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/deserializing_consumer.py", line 110, in poll
    value = self._value_deserializer(value, ctx)
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/schema_registry/avro.py", line 422, in __call__
    registered_schema = self._registry.get_schema(schema_id)
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 435, in get_schema
    response = self._rest_client.get('schemas/ids/{}'.format(schema_id))
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 125, in get
    return self.send_request(url, method='GET', query=query)
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 175, in send_request
    raise SchemaRegistryError(response.status_code,
confluent_kafka.schema_registry.error.SchemaRegistryError: HTTP 404 Not Found (HTTP status code 404, SR code 404)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/run/pipeline.py", line 462, in run
    for wu in itertools.islice(
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 147, in auto_workunit_reporter
    for wu in stream:
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/source/datahub/datahub_source.py", line 80, in get_workunits_internal
    yield from self._get_kafka_workunits(from_offsets=state.kafka_offsets)
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/source/datahub/datahub_source.py", line 128, in _get_kafka_workunits
    for i, (mcl, offset) in enumerate(mcls):
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/source/datahub/datahub_kafka_reader.py", line 70, in get_mcls
    yield from self._poll_partition(stop_time)
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/ingestion/source/datahub/datahub_kafka_reader.py", line 78, in _poll_partition
    msg = self.consumer.poll(10)
  File "/datahub-ingestion/.venv/lib/python3.10/site-packages/confluent_kafka/deserializing_consumer.py", line 112, in poll
    raise ValueDeserializationError(exception=se, kafka_message=msg)
confluent_kafka.error.ValueDeserializationError: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="HTTP 404 Not Found (HTTP status code 404, SR code 404)"}
[2025-06-04 08:31:37,074] ERROR    {datahub.ingestion.run.pipeline:739} - Pipeline Error: Ingestion pipeline raised an unexpected exception!Pipeline Error: Ingestion pipeline raised an unexpected exception!: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="HTTP 404 Not Found (HTTP status code 404, SR code 404)"}

Expected behavior schema_registry_url can be set to none , because When I used datahub 0.13 before, it was possible to perform the migration without schema registry

Screenshots

Desktop (please complete the following information):

  • OS: ubuntu
  • Browser chrome
  • Version 22

Additional context Add any other context about the problem here.

lordk911 avatar Jun 04 '25 09:06 lordk911

I deploy datahub with helm , and copy the recipe.yaml file into datahub-acryl-datahub-actions* pod to execute ingest. I try to set schema_registry_url: "http://datahub-datahub-gms:8080/schema-registry/api/" this time no error I got , but also no data read from source.

kubectl -n datahub exec -it datahub-acryl-datahub-actions-5457967d87-csslr -- bash

cat /tmp/recipe.migrate.yaml :

pipeline_name: datahub_source_2
datahub_api:
  server: "http://datahub-datahub-gms:8080" 
  token: "ey**o"
source:
  type: datahub
  config:
    include_all_versions: false
    database_connection:
      scheme: "mysql+pymysql"  
      host_port: "prerequisites-mysql:3306"
      username: "root"
      password: "**"
      database: "datahub"
    kafka_connection:
      bootstrap: "prerequisites-kafka:9092"
        #schema_registry_url: "http://prerequisites-cp-schema-registry:8081/schema-registry/api/"
      schema_registry_url: "http://datahub-datahub-gms:8080/schema-registry/api/"
    stateful_ingestion:
      enabled: true
      ignore_old_state: true
    urn_pattern:
      deny:
        # Ignores all datahub metadata where the urn matches the regex
        #- ^urn:li:dataFlow.*
        #- ^urn:li:dataJob.*
        - ^denied.urn.*
      allow:
        # Ingests all datahub metadata where the urn matches the regex.
        - ^allowed.urn.*

#flags:
   #set_system_metadata: false # Replicate system metadata          
# Here, we write to a DataHub instance
# You can also use a different sink, e.g. to write the data to a file instead
sink:
  type: datahub-rest
  config:
    server: "http://10.9.27.26:31900"
    token: "ey**Yk"

datahub ingest run -c /tmp/recipe.migrate.yaml

[2025-06-04 10:40:27,784] INFO     {datahub.cli.ingest_cli:149} - DataHub CLI version: 0.14.0.5
[2025-06-04 10:40:27,897] INFO     {datahub.ingestion.run.pipeline:271} - Sink configured successfully. DataHubRestEmitter: configured to talk to http://10.9.27.26:31900 with token: eyJh**********iiYk
[2025-06-04 10:40:28,130] WARNING  {datahub.ingestion.source.state.stateful_ingestion_base:276} - The 'ignore_old_state' config is True. The old checkpoint state will not be provided.
[2025-06-04 10:40:28,131] INFO     {datahub.ingestion.run.pipeline:295} - Source configured successfully.
[2025-06-04 10:40:28,131] INFO     {datahub.cli.ingest_cli:130} - Starting metadata ingestion
-[2025-06-04 10:40:28,132] INFO     {datahub.ingestion.source.datahub.datahub_source:63} - Ingesting DataHub metadata up until 2025-06-04 10:40:28.132703+00:00
[2025-06-04 10:40:28,132] INFO     {datahub.ingestion.source.datahub.datahub_source:93} - Fetching database aspects starting from 1970-01-01 00:00:00+00:00
\[2025-06-04 10:42:07,399] WARNING  {datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:95} - No state available to commit for DatahubIngestionCheckpointingProvider
[2025-06-04 10:42:07,399] INFO     {datahub.ingestion.source.datahub.datahub_source:121} - Fetching timeseries aspects from kafka
\[2025-06-04 10:42:20,487] WARNING  {datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:95} - No state available to commit for DatahubIngestionCheckpointingProvider
[2025-06-04 10:42:20,488] INFO     {datahub.ingestion.run.pipeline:571} - Processing commit request for DatahubIngestionCheckpointingProvider. Commit policy = CommitPolicy.ALWAYS, has_errors=False, has_warnings=False
[2025-06-04 10:42:20,488] WARNING  {datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:95} - No state available to commit for DatahubIngestionCheckpointingProvider
[2025-06-04 10:42:20,488] INFO     {datahub.ingestion.run.pipeline:591} - Successfully committed changes for DatahubIngestionCheckpointingProvider.
-[2025-06-04 10:42:20,714] INFO     {datahub.cli.ingest_cli:143} - Finished metadata ingestion
/
Cli report:
{'cli_version': '0.14.0.5',
 'cli_entry_location': '/datahub-ingestion/.venv/lib/python3.10/site-packages/datahub/__init__.py',
 'models_version': 'bundled',
 'py_version': '3.10.12 (main, Jul 29 2024, 16:56:48) [GCC 11.4.0]',
 'py_exec_path': '/datahub-ingestion/.venv/bin/python3',
 'os_details': 'Linux-6.2.0-35-generic-x86_64-with-glibc2.35',
 'mem_info': '256.31 MB',
 'peak_memory_usage': '256.31 MB',
 'disk_info': {'total': '15.21 TB', 'used': '32.39 GB', 'used_initally': '32.39 GB', 'free': '15.18 TB'},
 'peak_disk_usage': '32.39 GB',
 'thread_count': 2,
 'peak_thread_count': 3}
Source (datahub) report:
{'events_produced': 0,
 'events_produced_per_sec': 0,
 'entities': {},
 'aspects': {},
 'aspect_urn_samples': {},
 'stop_time': '2025-06-04 10:40:28.132703+00:00 (1 minute and 52.89 seconds ago)',
 'num_database_aspects_ingested': 0,
 'num_database_parse_errors': 0,
 'database_parse_errors': {},
 'num_kafka_aspects_ingested': 0,
 'num_kafka_parse_errors': 0,
 'kafka_parse_errors': {},
 'num_timeseries_deletions_dropped': 0,
 'start_time': '2025-06-04 10:40:28.131217 (1 minute and 52.89 seconds ago)',
 'running_time': '1 minute and 52.89 seconds',
 'failures': [],
 'warnings': [],
 'infos': []}
Sink (datahub-rest) report:
{'total_records_written': 3,
 'records_written_per_second': 0,
 'warnings': [],
 'failures': [],
 'start_time': '2025-06-04 10:40:27.842728 (1 minute and 53.18 seconds ago)',
 'current_time': '2025-06-04 10:42:21.019704 (now)',
 'total_duration_in_seconds': 113.18,
 'max_threads': 15,
 'gms_version': 'v1.1.0',
 'pending_requests': 0,
 'main_thread_blocking_timer': '0.224 seconds'}

Pipeline finished successfully; produced 3 events in 1 minute and 52.89 seconds.

lordk911 avatar Jun 04 '25 10:06 lordk911

The final DataHub migration was achieved by migrating the database and rebuilding the Elasticsearch indexes.

lordk911 avatar Jun 05 '25 03:06 lordk911

@lordk911, could you please the reasoning behind migrating the data from one version to another and not taking the approach of running datahub upgrade module(latest version) which takes care of updating the schema in persistent store and as well as updating the indexes in elastic ?

deepgarg760 avatar Jun 23 '25 18:06 deepgarg760