kafka-ui
kafka-ui copied to clipboard
Schema validation error when using AVRO schema with union types
Issue submitter TODO list
- [X] I've looked up my issue in FAQ
- [X] I've searched for an already existing issues here
- [X] I've tried running
master-labeled docker image and the issue still persists there - [X] I'm running a supported version of the application which is listed here
Describe the bug (actual behavior)
I am trying to publish messages in the topic test with Value Serde Schemaregistry.
The following message is published successfully:
{"offerPrice": null,"customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}
However, the following message is faild to be published:
{"offerPrice": "teststring","customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}
Expected behavior
the message
{"offerPrice": "teststring","customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}
should pass the schema validation
Your installation details
I am using the following docker-compose:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.5.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:53a6553765a806eda9905c43bfcfe09da6812035
ports:
- 8089:8080
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_NAME: LOCAL
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: "http://schema-registry:8081"
KAFKA_CLUSTERS_0_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: '' # DISABLE COMMON NAME VERIFICATION
depends_on:
- broker
- schema-registry
and the following AVRO schema test-value
{
"type": "record",
"name": "test",
"namespace": "com.test.model.avro",
"fields": [
{
"name": "customerOrderId",
"type": {
"type": "string",
"logicalType": "uuid"
}
},
{
"name": "offerPrice",
"type": [
"null",
"string"
],
"default": null
}
]
}
Steps to reproduce
- run the docker compose file with
docker-compose upcommand - go to http://localhost:8089/ui/clusters/LOCAL/schemas and create the schema
test-value - go to http://localhost:8089/ui/clusters/LOCAL/all-topics?perPage=25&hideInternal=true and create a topic called
test - go to http://localhost:8089/ui/clusters/LOCAL/all-topics/test/messages and produce a message with any key and a value:
{"offerPrice": "teststring","customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}
Screenshots
No response
Logs
No response
Additional context
No response
Hello there emad-eldeen! 👋
Thank you and congratulations 🎉 for opening your very first issue in this project! 💖
In case you want to claim this issue, please comment down below! We will try to get back to you as soon as we can. 👀
I tried bypass the frontend and send a POST request to the backend.
I got error code 500 with the following response:
{
"code": 5000,
"message": "Failed to serialize record for topic test",
"timestamp": 1697609387664,
"requestId": "a61877db-220",
"fieldsErrors": null,
"stackTrace": "java.lang.RuntimeException: Failed to serialize record for topic test\n\tat com.provectus.kafka.ui.serdes.builtin.sr.AvroSchemaRegistrySerializer.serialize(AvroSchemaRegistrySerializer.java:40)\n\tSuppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: \nError has been observed at the following site(s):\n\t*__checkpoint ⇢ Handler com.provectus.kafka.ui.controller.MessagesController#sendTopicMessages(String, String, Mono, ServerWebExchange) [DispatcherHandler]\n\t*__checkpoint ⇢ com.provectus.kafka.ui.config.CorsGlobalConfiguration$$Lambda$1025/0x00000008016838c8 [DefaultWebFilterChain]\n\t*__checkpoint ⇢ com.provectus.kafka.ui.config.CustomWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ com.provectus.kafka.ui.config.ReadOnlyModeFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ AuthorizationWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ ExceptionTranslationWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ LogoutWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ ServerRequestCacheWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ ReactorContextWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ HttpHeaderWriterWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]\n\t*__checkpoint ⇢ org.springframework.web.filter.reactive.ServerHttpObservationFilter [DefaultWebFilterChain]\n\t*__checkpoint ⇢ HTTP POST \"/api/clusters/LOCAL/topics/test/messages\" [ExceptionHandlingWebHandler]\nOriginal Stack Trace:\n\t\tat com.provectus.kafka.ui.serdes.builtin.sr.AvroSchemaRegistrySerializer.serialize(AvroSchemaRegistrySerializer.java:40)\n\t\tat com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerializer.serialize(SchemaRegistrySerializer.java:29)\n\t\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$3(SerdeInstance.java:63)\n\t\tat com.provectus.kafka.ui.serdes.SerdeInstance.wrapWithClassloader(SerdeInstance.java:34)\n\t\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$4(SerdeInstance.java:63)\n\t\tat com.provectus.kafka.ui.serdes.ProducerRecordCreator.create(ProducerRecordCreator.java:27)\n\t\tat com.provectus.kafka.ui.service.MessagesService.sendMessageImpl(MessagesService.java:137)\n\t\tat com.provectus.kafka.ui.service.MessagesService.lambda$sendMessage$8(MessagesService.java:112)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)\n\t\tat reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:181)\n\t\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\n\t\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\n\t\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\t\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\t\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\t\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.avro.AvroTypeException: Expected start-union. Got VALUE_STRING\n\tat org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:511)\n\tat org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:430)\n\tat org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)\n\tat org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)\n\tat org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)\n\tat org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)\n\tat org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)\n\tat org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)\n\tat org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)\n\tat org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)\n\tat io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.toObject(AvroSchemaUtils.java:214)\n\tat com.provectus.kafka.ui.serdes.builtin.sr.AvroSchemaRegistrySerializer.serialize(AvroSchemaRegistrySerializer.java:38)\n\tat com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerializer.serialize(SchemaRegistrySerializer.java:29)\n\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$3(SerdeInstance.java:63)\n\tat com.provectus.kafka.ui.serdes.SerdeInstance.wrapWithClassloader(SerdeInstance.java:34)\n\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$4(SerdeInstance.java:63)\n\tat com.provectus.kafka.ui.serdes.ProducerRecordCreator.create(ProducerRecordCreator.java:27)\n\tat com.provectus.kafka.ui.service.MessagesService.sendMessageImpl(MessagesService.java:137)\n\tat com.provectus.kafka.ui.service.MessagesService.lambda$sendMessage$8(MessagesService.java:112)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)\n\tat reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:181)\n\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\n\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n"
}