spring-cloud-stream-samples icon indicating copy to clipboard operation
spring-cloud-stream-samples copied to clipboard

schema-registry-confluent-avro-serializer: org.apache.avro.AvroRuntimeException: Not an enum:

Open eskuai opened this issue 5 years ago • 8 comments

As a user, deploying the schema-registry-confluent-avro-serializer demo i got a exception by curl -X POST http://localhost:9010/messages


Caused by: org.apache.avro.AvroRuntimeException: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]}
        at org.apache.avro.Schema.getEnumSymbols(Schema.java:233)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688)
        at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
        ... 49 more

eskuai avatar Feb 17 '20 12:02 eskuai

This issue has 0 supporting information on how to reproduce, nor it contains a stack trace that shows anything related to spring-cloud-stream. All I see is some avro issue similar to https://stackoverflow.com/questions/48391847/avrotypeexception-not-an-enum-mobile-on-datafilewriter

Please provide detailed information as well as description of a process that would allows us to reproduce, otherwise this issue is not actionable and will be closed.

olegz avatar Feb 17 '20 12:02 olegz

Hi

Following instructions from https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/schema-registry-confluent-avro-serializer

$ docker-compose up -d --build

[ec2-user@k8s-master ~]$ sudo docker ps -a
CONTAINER ID        IMAGE                                                                                                                                 COMMAND                  CREATED             STATUS                        PORTS
49caa7f6f6dc        wurstmeister/zookeeper                                                                                                                "/bin/sh -c '/usr/..."   18 minutes ago      Created
4a25c19a337d        confluentinc/cp-ksql-cli:5.2.1                                                                                                        "/bin/sh"                26 minutes ago      Up 26 minutes
4a08bb58577a        confluentinc/ksql-examples:5.1.2                                                                                                      "bash -c 'echo Wai..."   26 minutes ago      Up 26 minutes
d904fe722348        confluentinc/cp-enterprise-control-center:5.2.1                                                                                       "/etc/confluent/do..."   26 minutes ago      Up 26 minutes                 0.0.0.0:90
e5bc32a7f02c        confluentinc/cp-ksql-server:5.2.1                                                                                                     "/etc/confluent/do..."   26 minutes ago      Up 26 minutes                 0.0.0.0:80
81d0fa9bb6c3        confluentinc/kafka-connect-datagen:latest                                                                                             "bash -c 'if [ ! -..."   26 minutes ago      Up 26 minutes                 0.0.0.0:80
f6a708b4298f        confluentinc/cp-kafka-rest:5.2.1                                                                                                      "/etc/confluent/do..."   26 minutes ago      Up 26 minutes                 0.0.0.0:80
8d8ae2572940        confluentinc/cp-schema-registry:5.2.1                                                                                                 "/etc/confluent/do..."   26 minutes ago      Up 26 minutes                 0.0.0.0:80
74ef67334657        confluentinc/cp-enterprise-kafka:5.2.1                                                                                                "/etc/confluent/do..."   26 minutes ago      Up 26 minutes                 0.0.0.0:90
87282a119871        confluentinc/cp-zookeeper:5.2.1                                                                                                       "/etc/confluent/do..."   26 minutes ago      Up 26 minutes                 2888/tcp,

and then

[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$ curl -X PUT http://127.0.0.1:8081/config -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"
{"compatibility":"NONE"}[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$

and another curl with debug:

[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$  curl -vvv  -X PUT http://127.0.0.1:8081/config -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"
* About to connect() to 127.0.0.1 port 8081 (#0)
*   Trying 127.0.0.1...
* Connected to 127.0.0.1 (127.0.0.1) port 8081 (#0)
> PUT /config HTTP/1.1
> User-Agent: curl/7.29.0
> Host: 127.0.0.1:8081
> Accept: */*
> Content-Type:application/json
> Content-Length: 25
>
* upload completely sent off: 25 out of 25 bytes
< HTTP/1.1 200 OK
< Date: Mon, 17 Feb 2020 12:19:00 GMT
< Content-Type: application/vnd.schemaregistry.v1+json
< Content-Length: 24
< Server: Jetty(9.4.14.v20181114)
<
* Connection #0 to host 127.0.0.1 left intact
{"compatibility":"NONE"}[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$

Starting jars, consumer first

[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$ java -jar schema-registry-confluent-avro-serializer-consumer/target/schema-registry-confluent-avro-serializer-consumer-0.0.1-SNAPSHOT.jar
 &
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::  (v2.2.0.BUILD-SNAPSHOT)


2020-02-17 13:14:18.589  INFO 16558 --- [           main] s.c.ConfluentAvroConsumerApplication     : Starting ConfluentAvroConsumerApplication v0.0.1-SNAPSHOT on k8s-master with PID 16558 (/home/ec2-user/spring-cloud-stream-samples/schema-registry-samples/schema-registry-confluent-avro-serializer/schema-registry-confluent-avro-serializer-consumer/target/schema-registry-confluent-avro-serializer-consumer-0.0.1-SNAPSHOT.jar started by ec2-user in /home/ec2-user/spring-cloud-stream-samples/schema-registry-samples/schema-registry-confluent-avro-serializer)
2020-02-17 13:14:18.592  INFO 16558 --- [           main] s.c.ConfluentAvroConsumerApplication     : No active profile set, falling back to default profiles: default
2020-02-17 13:14:19.349  INFO 16558 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-02-17 13:14:19.354  INFO 16558 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2020-02-17 13:14:19.362  INFO 16558 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-02-17 13:14:19.415  INFO 16558 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:14:19.430  INFO 16558 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:14:19.433  INFO 16558 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:14:20.060  INFO 16558 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function '' with acceptedOutputTypes: []
2020-02-17 13:14:20.073  INFO 16558 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'process' with acceptedOutputTypes: []
2020-02-17 13:14:20.150  INFO 16558 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2020-02-17 13:14:20.185  INFO 16558 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'process' with acceptedOutputTypes: []
2020-02-17 13:14:20.208  INFO 16558 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'process' with acceptedOutputTypes: [application/json]
2020-02-17 13:14:20.227  INFO 16558 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.process-in-0' has 1 subscriber(s).
2020-02-17 13:14:20.231  INFO 16558 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'process' with acceptedOutputTypes: []
2020-02-17 13:14:20.333  INFO 16558 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-02-17 13:14:20.333  INFO 16558 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2020-02-17 13:14:20.333  INFO 16558 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2020-02-17 13:14:20.697  INFO 16558 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
        bootstrap.servers = [localhost:9092]
        client.dns.lookup = default
        client.id =
        connections.max.idle.ms = 300000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 120000
        retries = 5
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS

2020-02-17 13:14:20.797  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0
2020-02-17 13:14:20.799  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:14:20.799  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1581941660796
2020-02-17 13:14:21.101  INFO 16558 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 100
        auto.offset.reset = latest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id =
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = anonymous.15ada766-c659-4698-a7b5-465db0beffb3
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2020-02-17 13:14:21.140  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0
2020-02-17 13:14:21.140  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:14:21.140  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1581941661140
2020-02-17 13:14:21.186  INFO 16558 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'process-in-0.anonymous.15ada766-c659-4698-a7b5-465db0beffb3.errors' has 1 subscriber(s).
2020-02-17 13:14:21.186  INFO 16558 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'process-in-0.anonymous.15ada766-c659-4698-a7b5-465db0beffb3.errors' has 0 subscriber(s).
2020-02-17 13:14:21.186  INFO 16558 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'process-in-0.anonymous.15ada766-c659-4698-a7b5-465db0beffb3.errors' has 1 subscriber(s).
2020-02-17 13:14:21.186  INFO 16558 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'process-in-0.anonymous.15ada766-c659-4698-a7b5-465db0beffb3.errors' has 2 subscriber(s).
2020-02-17 13:14:21.191  INFO 16558 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
        bootstrap.servers = [localhost:9092]
        client.dns.lookup = default
        client.id =
        connections.max.idle.ms = 300000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 120000
        retries = 5
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS

2020-02-17 13:14:21.194  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0
2020-02-17 13:14:21.194  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:14:21.194  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1581941661194
2020-02-17 13:14:21.219  INFO 16558 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 100
        auto.offset.reset = latest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id =
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = anonymous.15ada766-c659-4698-a7b5-465db0beffb3
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2020-02-17 13:14:21.226  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0
2020-02-17 13:14:21.226  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:14:21.226  INFO 16558 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1581941661226
2020-02-17 13:14:21.227  INFO 16558 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Subscribed to topic(s): process-in-0
2020-02-17 13:14:21.228  INFO 16558 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-02-17 13:14:21.234  INFO 16558 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6bf0219d
2020-02-17 13:14:21.245  INFO 16558 --- [container-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Cluster ID: y8Ab0_tTTd6yxVa9zCYHdQ
2020-02-17 13:14:21.246  INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
2020-02-17 13:14:21.248  INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Revoking previously assigned partitions []
2020-02-17 13:14:21.248  INFO 16558 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : anonymous.15ada766-c659-4698-a7b5-465db0beffb3: partitions revoked: []
2020-02-17 13:14:21.248  INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] (Re-)joining group
2020-02-17 13:14:21.250  INFO 16558 --- [           main] s.c.ConfluentAvroConsumerApplication     : Started ConfluentAvroConsumerApplication in 3.209 seconds (JVM running for 3.778)
2020-02-17 13:14:21.256  INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] (Re-)joining group
2020-02-17 13:14:21.263  INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Successfully joined group with generation 1
2020-02-17 13:14:21.267  INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Setting newly assigned partitions: process-in-0-0
2020-02-17 13:14:21.275  INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Found no committed offset for partition process-in-0-0
2020-02-17 13:14:21.286  INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Resetting offset for partition process-in-0-0 to offset 0.
2020-02-17 13:14:21.292  INFO 16558 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : anonymous.15ada766-c659-4698-a7b5-465db0beffb3: partitions assigned: [process-in-0-0]

and one producer,


[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$ java -jar schema-registry-confluent-avro-serializer-producer2/target/schema-registry-confluent-avro-serializer-producer2-0.0.1-SNAPSHOT.jar

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::  (v2.2.0.BUILD-SNAPSHOT)

2020-02-17 13:15:37.032  INFO 17767 --- [           main] s.p.ConfluentAvroProducer2Application    : Starting ConfluentAvroProducer2Application v0.0.1-SNAPSHOT on k8s-master with PID 17767 (/home/ec2-user/spring-cloud-stream-samples/schema-registry-samples/schema-registry-confluent-avro-serializer/schema-registry-confluent-avro-serializer-producer2/target/schema-registry-confluent-avro-serializer-producer2-0.0.1-SNAPSHOT.jar started by ec2-user in /home/ec2-user/spring-cloud-stream-samples/schema-registry-samples/schema-registry-confluent-avro-serializer)
2020-02-17 13:15:37.036  INFO 17767 --- [           main] s.p.ConfluentAvroProducer2Application    : No active profile set, falling back to default profiles: default
2020-02-17 13:15:38.703  INFO 17767 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-02-17 13:15:38.708  INFO 17767 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2020-02-17 13:15:38.715  INFO 17767 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-02-17 13:15:38.777  INFO 17767 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:15:38.787  INFO 17767 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:15:38.790  INFO 17767 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:15:39.244  INFO 17767 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 9010 (http)
2020-02-17 13:15:39.272  INFO 17767 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2020-02-17 13:15:39.273  INFO 17767 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.27]
2020-02-17 13:15:39.386  INFO 17767 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2020-02-17 13:15:39.386  INFO 17767 --- [           main] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 2256 ms
2020-02-17 13:15:39.889  INFO 17767 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2020-02-17 13:15:40.341  INFO 17767 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-02-17 13:15:40.657  INFO 17767 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function '' with acceptedOutputTypes: []
2020-02-17 13:15:40.672  INFO 17767 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'supplier' with acceptedOutputTypes: []
2020-02-17 13:15:40.712  INFO 17767 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'supplier' with acceptedOutputTypes: []
2020-02-17 13:15:40.721  INFO 17767 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'supplier' with acceptedOutputTypes: []
2020-02-17 13:15:40.721  INFO 17767 --- [           main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'supplier' with acceptedOutputTypes: [application/json]
2020-02-17 13:15:41.049  INFO 17767 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2020-02-17 13:15:41.158  INFO 17767 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-02-17 13:15:41.158  INFO 17767 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2020-02-17 13:15:41.159  INFO 17767 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2020-02-17 13:15:41.159  INFO 17767 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {router} as a subscriber to the 'supplier_integrationflow.channel#0' channel
2020-02-17 13:15:41.159  INFO 17767 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.supplier_integrationflow.channel#0' has 1 subscriber(s).
2020-02-17 13:15:41.159  INFO 17767 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'supplier_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-02-17 13:15:41.668  INFO 17767 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: supplier-out-0
2020-02-17 13:15:41.673  INFO 17767 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
        bootstrap.servers = [localhost:9092]
        client.dns.lookup = default
        client.id =
        connections.max.idle.ms = 300000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 120000
        retries = 5
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS

2020-02-17 13:15:41.762  INFO 17767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0
2020-02-17 13:15:41.764  INFO 17767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:15:41.764  INFO 17767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1581941741760
2020-02-17 13:15:41.997  INFO 17767 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id =
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-02-17 13:15:42.027  INFO 17767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0
2020-02-17 13:15:42.028  INFO 17767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:15:42.029  INFO 17767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1581941742027
2020-02-17 13:15:42.041  INFO 17767 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: y8Ab0_tTTd6yxVa9zCYHdQ
2020-02-17 13:15:42.043  INFO 17767 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2020-02-17 13:15:42.065  INFO 17767 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.supplier-out-0' has 1 subscriber(s).
2020-02-17 13:15:42.068  INFO 17767 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : started bean 'supplier_integrationflow.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'

and testing with curl

curl -X POST http://localhost:9010/messages
ok, have fun with v2 payload![ec2-user@k8s-master schema-registry-confluent-avro-serializer]$ 2020-02-17 13:16:11.596 ERROR 819 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to invoke method; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"])
        at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:115)
        at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"])
        at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:286)
        at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertToInternal(ApplicationJsonMessageMarshallingConverter.java:69)
        at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
        at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191)
        at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83)
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputValueIfNecessary$2(BeanFactoryAwareFunctionRegistry.java:620)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359)
        at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
        at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:626)
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.doApply(BeanFactoryAwareFunctionRegistry.java:569)
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.get(BeanFactoryAwareFunctionRegistry.java:474)
        at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:279)
        at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:263)
        at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:112)
        ... 18 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"])
        at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:394)
        at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:353)
        at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:316)
        at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:729)
        at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
        at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
        at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
        at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
        at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2795)
        at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:270)
        ... 41 more
Caused by: org.apache.avro.AvroRuntimeException: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]}
        at org.apache.avro.Schema.getEnumSymbols(Schema.java:233)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688)
        at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
        ... 49 more

Just following readme instructions ....

no changed maded

Tx

eskuai avatar Feb 17 '20 12:02 eskuai

Your content type is application/json which forces the framework to use ApplicationJsonMessageMarshallingConverter. application/avro perhaps?

olegz avatar Feb 17 '20 12:02 olegz

Hi,

i dont know.

I just following readme document

Example:

curl -X POST http://localhost:9009/messages
curl -X POST http://localhost:9010/messages
curl -X POST http://localhost:9009/messages
curl -X POST http://localhost:9009/messages
curl -X POST http://localhost:9010/messages

imagen

Why can i get the same output like screenshot?

eskuai avatar Feb 17 '20 12:02 eskuai

Testing with avro 1.7.7 and 1.9.2 and fails With pom.xml including <fieldVisibility>PRIVATE</fieldVisibility> fails

eskuai avatar Feb 17 '20 13:02 eskuai

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>5.4.0</version>
</dependency>

doesn't work

eskuai avatar Feb 17 '20 14:02 eskuai

Consumer needs?


spring:
  cloud:
    stream:
      bindings:
        input:
          destination: sensor-topic
          consumer:
            useNativeDecoding: true
      kafka:
        bindings:
          output:
            content-type: application/*+avro
          input:
            consumer:
              configuration:
                key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
                value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
                schema.registry.url: http://localhost:8081
                specific.avro.reader: true
server.port: 9999

and producer?


spring:
  cloud:
    stream:
      bindings:
        output:
          destination: sensor-topic
          producer:
            useNativeEncoding: true
      kafka:
        bindings:
          input:
            content-type: application/*+avro
          output:
            producer:
              configuration:
                key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
server.port: 9009

fails

eskuai avatar Feb 17 '20 15:02 eskuai

Hi, eskuai. I

I think your producer configuration should use a key-serializer instead of a key-deserializer:

image

Like in this configuration below:

      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

Augusto11CB avatar Jul 12 '23 15:07 Augusto11CB