kafka-ui icon indicating copy to clipboard operation
kafka-ui copied to clipboard

Schema validation error when using AVRO schema with union types

Open emad-eldeen opened this issue 2 years ago • 2 comments

Issue submitter TODO list

  • [X] I've looked up my issue in FAQ
  • [X] I've searched for an already existing issues here
  • [X] I've tried running master-labeled docker image and the issue still persists there
  • [X] I'm running a supported version of the application which is listed here

Describe the bug (actual behavior)

I am trying to publish messages in the topic test with Value Serde Schemaregistry.

The following message is published successfully:

{"offerPrice": null,"customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}

image

image

However, the following message is faild to be published:

{"offerPrice": "teststring","customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}

image

Expected behavior

the message

{"offerPrice": "teststring","customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}

should pass the schema validation

Your installation details

I am using the following docker-compose:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:53a6553765a806eda9905c43bfcfe09da6812035
    ports:
      - 8089:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
      KAFKA_CLUSTERS_0_NAME: LOCAL
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: "http://schema-registry:8081"
      KAFKA_CLUSTERS_0_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: '' # DISABLE COMMON NAME VERIFICATION
    depends_on:
      - broker
      - schema-registry

and the following AVRO schema test-value

{
	"type": "record",
	"name": "test",
	"namespace": "com.test.model.avro",
	"fields": [
		{
			"name": "customerOrderId",
			"type": {
				"type": "string",
				"logicalType": "uuid"
			}
		},
		{
			"name": "offerPrice",
			"type": [
				"null",
				"string"
			],
			"default": null
		}
	]
}

Steps to reproduce

  • run the docker compose file with docker-compose upcommand
  • go to http://localhost:8089/ui/clusters/LOCAL/schemas and create the schema test-value
  • go to http://localhost:8089/ui/clusters/LOCAL/all-topics?perPage=25&hideInternal=true and create a topic called test
  • go to http://localhost:8089/ui/clusters/LOCAL/all-topics/test/messages and produce a message with any key and a value:
{"offerPrice": "teststring","customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}

Screenshots

No response

Logs

No response

Additional context

No response

emad-eldeen avatar Oct 11 '23 09:10 emad-eldeen

Hello there emad-eldeen! 👋

Thank you and congratulations 🎉 for opening your very first issue in this project! 💖

In case you want to claim this issue, please comment down below! We will try to get back to you as soon as we can. 👀

github-actions[bot] avatar Oct 11 '23 09:10 github-actions[bot]

I tried bypass the frontend and send a POST request to the backend. image

I got error code 500 with the following response:

{
    "code": 5000,
    "message": "Failed to serialize record for topic test",
    "timestamp": 1697609387664,
    "requestId": "a61877db-220",
    "fieldsErrors": null,
    "stackTrace": "java.lang.RuntimeException: Failed to serialize record for topic test\n\tat com.provectus.kafka.ui.serdes.builtin.sr.AvroSchemaRegistrySerializer.serialize(AvroSchemaRegistrySerializer.java:40)\n\tSuppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: \nError has been observed at the following site(s):\n\t*__checkpoint ⇢ Handler com.provectus.kafka.ui.controller.MessagesController#sendTopicMessages(String, String, Mono, ServerWebExchange) [DispatcherHandler]\n\t*__checkpoint ⇢ com.provectus.kafka.ui.config.CorsGlobalConfiguration$$Lambda$1025/0x00000008016838c8 [DefaultWebFilterChain]\n\t*__checkpoint ⇢ com.provectus.kafka.ui.config.CustomWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ com.provectus.kafka.ui.config.ReadOnlyModeFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ AuthorizationWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ ExceptionTranslationWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ LogoutWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ ServerRequestCacheWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ ReactorContextWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ HttpHeaderWriterWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]\n\t*__checkpoint ⇢ org.springframework.web.filter.reactive.ServerHttpObservationFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ HTTP POST \"/api/clusters/LOCAL/topics/test/messages\" [ExceptionHandlingWebHandler]\nOriginal Stack Trace:\n\t\tat com.provectus.kafka.ui.serdes.builtin.sr.AvroSchemaRegistrySerializer.serialize(AvroSchemaRegistrySerializer.java:40)\n\t\tat com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerializer.serialize(SchemaRegistrySerializer.java:29)\n\t\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$3(SerdeInstance.java:63)\n\t\tat com.provectus.kafka.ui.serdes.SerdeInstance.wrapWithClassloader(SerdeInstance.java:34)\n\t\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$4(SerdeInstance.java:63)\n\t\tat com.provectus.kafka.ui.serdes.ProducerRecordCreator.create(ProducerRecordCreator.java:27)\n\t\tat com.provectus.kafka.ui.service.MessagesService.sendMessageImpl(MessagesService.java:137)\n\t\tat com.provectus.kafka.ui.service.MessagesService.lambda$sendMessage$8(MessagesService.java:112)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)\n\t\tat reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:181)\n\t\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\n\t\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\n\t\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\t\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\t\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\t\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.avro.AvroTypeException: Expected start-union. Got VALUE_STRING\n\tat org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:511)\n\tat org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:430)\n\tat org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)\n\tat org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)\n\tat org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)\n\tat org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)\n\tat org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)\n\tat org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)\n\tat org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)\n\tat org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)\n\tat io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.toObject(AvroSchemaUtils.java:214)\n\tat com.provectus.kafka.ui.serdes.builtin.sr.AvroSchemaRegistrySerializer.serialize(AvroSchemaRegistrySerializer.java:38)\n\tat com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerializer.serialize(SchemaRegistrySerializer.java:29)\n\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$3(SerdeInstance.java:63)\n\tat com.provectus.kafka.ui.serdes.SerdeInstance.wrapWithClassloader(SerdeInstance.java:34)\n\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$4(SerdeInstance.java:63)\n\tat com.provectus.kafka.ui.serdes.ProducerRecordCreator.create(ProducerRecordCreator.java:27)\n\tat com.provectus.kafka.ui.service.MessagesService.sendMessageImpl(MessagesService.java:137)\n\tat com.provectus.kafka.ui.service.MessagesService.lambda$sendMessage$8(MessagesService.java:112)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)\n\tat reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:181)\n\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\n\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n"
}

emad-eldeen avatar Oct 18 '23 06:10 emad-eldeen