spring-cloud-stream-samples
spring-cloud-stream-samples copied to clipboard
schema-registry-confluent-avro-serializer: org.apache.avro.AvroRuntimeException: Not an enum:
As a user, deploying the schema-registry-confluent-avro-serializer demo
i got a exception by
curl -X POST http://localhost:9010/messages
Caused by: org.apache.avro.AvroRuntimeException: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]}
at org.apache.avro.Schema.getEnumSymbols(Schema.java:233)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
... 49 more
This issue has 0 supporting information on how to reproduce, nor it contains a stack trace that shows anything related to spring-cloud-stream. All I see is some avro issue similar to https://stackoverflow.com/questions/48391847/avrotypeexception-not-an-enum-mobile-on-datafilewriter
Please provide detailed information as well as description of a process that would allows us to reproduce, otherwise this issue is not actionable and will be closed.
Hi
Following instructions from
https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/schema-registry-confluent-avro-serializer
$ docker-compose up -d --build
[ec2-user@k8s-master ~]$ sudo docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
49caa7f6f6dc wurstmeister/zookeeper "/bin/sh -c '/usr/..." 18 minutes ago Created
4a25c19a337d confluentinc/cp-ksql-cli:5.2.1 "/bin/sh" 26 minutes ago Up 26 minutes
4a08bb58577a confluentinc/ksql-examples:5.1.2 "bash -c 'echo Wai..." 26 minutes ago Up 26 minutes
d904fe722348 confluentinc/cp-enterprise-control-center:5.2.1 "/etc/confluent/do..." 26 minutes ago Up 26 minutes 0.0.0.0:90
e5bc32a7f02c confluentinc/cp-ksql-server:5.2.1 "/etc/confluent/do..." 26 minutes ago Up 26 minutes 0.0.0.0:80
81d0fa9bb6c3 confluentinc/kafka-connect-datagen:latest "bash -c 'if [ ! -..." 26 minutes ago Up 26 minutes 0.0.0.0:80
f6a708b4298f confluentinc/cp-kafka-rest:5.2.1 "/etc/confluent/do..." 26 minutes ago Up 26 minutes 0.0.0.0:80
8d8ae2572940 confluentinc/cp-schema-registry:5.2.1 "/etc/confluent/do..." 26 minutes ago Up 26 minutes 0.0.0.0:80
74ef67334657 confluentinc/cp-enterprise-kafka:5.2.1 "/etc/confluent/do..." 26 minutes ago Up 26 minutes 0.0.0.0:90
87282a119871 confluentinc/cp-zookeeper:5.2.1 "/etc/confluent/do..." 26 minutes ago Up 26 minutes 2888/tcp,
and then
[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$ curl -X PUT http://127.0.0.1:8081/config -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"
{"compatibility":"NONE"}[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$
and another curl with debug:
[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$ curl -vvv -X PUT http://127.0.0.1:8081/config -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"
* About to connect() to 127.0.0.1 port 8081 (#0)
* Trying 127.0.0.1...
* Connected to 127.0.0.1 (127.0.0.1) port 8081 (#0)
> PUT /config HTTP/1.1
> User-Agent: curl/7.29.0
> Host: 127.0.0.1:8081
> Accept: */*
> Content-Type:application/json
> Content-Length: 25
>
* upload completely sent off: 25 out of 25 bytes
< HTTP/1.1 200 OK
< Date: Mon, 17 Feb 2020 12:19:00 GMT
< Content-Type: application/vnd.schemaregistry.v1+json
< Content-Length: 24
< Server: Jetty(9.4.14.v20181114)
<
* Connection #0 to host 127.0.0.1 left intact
{"compatibility":"NONE"}[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$
Starting jars, consumer first
[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$ java -jar schema-registry-confluent-avro-serializer-consumer/target/schema-registry-confluent-avro-serializer-consumer-0.0.1-SNAPSHOT.jar
&
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.0.BUILD-SNAPSHOT)
2020-02-17 13:14:18.589 INFO 16558 --- [ main] s.c.ConfluentAvroConsumerApplication : Starting ConfluentAvroConsumerApplication v0.0.1-SNAPSHOT on k8s-master with PID 16558 (/home/ec2-user/spring-cloud-stream-samples/schema-registry-samples/schema-registry-confluent-avro-serializer/schema-registry-confluent-avro-serializer-consumer/target/schema-registry-confluent-avro-serializer-consumer-0.0.1-SNAPSHOT.jar started by ec2-user in /home/ec2-user/spring-cloud-stream-samples/schema-registry-samples/schema-registry-confluent-avro-serializer)
2020-02-17 13:14:18.592 INFO 16558 --- [ main] s.c.ConfluentAvroConsumerApplication : No active profile set, falling back to default profiles: default
2020-02-17 13:14:19.349 INFO 16558 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-02-17 13:14:19.354 INFO 16558 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2020-02-17 13:14:19.362 INFO 16558 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-02-17 13:14:19.415 INFO 16558 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:14:19.430 INFO 16558 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:14:19.433 INFO 16558 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:14:20.060 INFO 16558 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function '' with acceptedOutputTypes: []
2020-02-17 13:14:20.073 INFO 16558 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'process' with acceptedOutputTypes: []
2020-02-17 13:14:20.150 INFO 16558 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2020-02-17 13:14:20.185 INFO 16558 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'process' with acceptedOutputTypes: []
2020-02-17 13:14:20.208 INFO 16558 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'process' with acceptedOutputTypes: [application/json]
2020-02-17 13:14:20.227 INFO 16558 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.process-in-0' has 1 subscriber(s).
2020-02-17 13:14:20.231 INFO 16558 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'process' with acceptedOutputTypes: []
2020-02-17 13:14:20.333 INFO 16558 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-02-17 13:14:20.333 INFO 16558 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2020-02-17 13:14:20.333 INFO 16558 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2020-02-17 13:14:20.697 INFO 16558 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2020-02-17 13:14:20.797 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
2020-02-17 13:14:20.799 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:14:20.799 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1581941660796
2020-02-17 13:14:21.101 INFO 16558 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = anonymous.15ada766-c659-4698-a7b5-465db0beffb3
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2020-02-17 13:14:21.140 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
2020-02-17 13:14:21.140 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:14:21.140 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1581941661140
2020-02-17 13:14:21.186 INFO 16558 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'process-in-0.anonymous.15ada766-c659-4698-a7b5-465db0beffb3.errors' has 1 subscriber(s).
2020-02-17 13:14:21.186 INFO 16558 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'process-in-0.anonymous.15ada766-c659-4698-a7b5-465db0beffb3.errors' has 0 subscriber(s).
2020-02-17 13:14:21.186 INFO 16558 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'process-in-0.anonymous.15ada766-c659-4698-a7b5-465db0beffb3.errors' has 1 subscriber(s).
2020-02-17 13:14:21.186 INFO 16558 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'process-in-0.anonymous.15ada766-c659-4698-a7b5-465db0beffb3.errors' has 2 subscriber(s).
2020-02-17 13:14:21.191 INFO 16558 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2020-02-17 13:14:21.194 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
2020-02-17 13:14:21.194 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:14:21.194 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1581941661194
2020-02-17 13:14:21.219 INFO 16558 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = anonymous.15ada766-c659-4698-a7b5-465db0beffb3
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2020-02-17 13:14:21.226 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
2020-02-17 13:14:21.226 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:14:21.226 INFO 16558 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1581941661226
2020-02-17 13:14:21.227 INFO 16558 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Subscribed to topic(s): process-in-0
2020-02-17 13:14:21.228 INFO 16558 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2020-02-17 13:14:21.234 INFO 16558 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6bf0219d
2020-02-17 13:14:21.245 INFO 16558 --- [container-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Cluster ID: y8Ab0_tTTd6yxVa9zCYHdQ
2020-02-17 13:14:21.246 INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
2020-02-17 13:14:21.248 INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Revoking previously assigned partitions []
2020-02-17 13:14:21.248 INFO 16558 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : anonymous.15ada766-c659-4698-a7b5-465db0beffb3: partitions revoked: []
2020-02-17 13:14:21.248 INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] (Re-)joining group
2020-02-17 13:14:21.250 INFO 16558 --- [ main] s.c.ConfluentAvroConsumerApplication : Started ConfluentAvroConsumerApplication in 3.209 seconds (JVM running for 3.778)
2020-02-17 13:14:21.256 INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] (Re-)joining group
2020-02-17 13:14:21.263 INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Successfully joined group with generation 1
2020-02-17 13:14:21.267 INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Setting newly assigned partitions: process-in-0-0
2020-02-17 13:14:21.275 INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Found no committed offset for partition process-in-0-0
2020-02-17 13:14:21.286 INFO 16558 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-2, groupId=anonymous.15ada766-c659-4698-a7b5-465db0beffb3] Resetting offset for partition process-in-0-0 to offset 0.
2020-02-17 13:14:21.292 INFO 16558 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : anonymous.15ada766-c659-4698-a7b5-465db0beffb3: partitions assigned: [process-in-0-0]
and one producer,
[ec2-user@k8s-master schema-registry-confluent-avro-serializer]$ java -jar schema-registry-confluent-avro-serializer-producer2/target/schema-registry-confluent-avro-serializer-producer2-0.0.1-SNAPSHOT.jar
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.0.BUILD-SNAPSHOT)
2020-02-17 13:15:37.032 INFO 17767 --- [ main] s.p.ConfluentAvroProducer2Application : Starting ConfluentAvroProducer2Application v0.0.1-SNAPSHOT on k8s-master with PID 17767 (/home/ec2-user/spring-cloud-stream-samples/schema-registry-samples/schema-registry-confluent-avro-serializer/schema-registry-confluent-avro-serializer-producer2/target/schema-registry-confluent-avro-serializer-producer2-0.0.1-SNAPSHOT.jar started by ec2-user in /home/ec2-user/spring-cloud-stream-samples/schema-registry-samples/schema-registry-confluent-avro-serializer)
2020-02-17 13:15:37.036 INFO 17767 --- [ main] s.p.ConfluentAvroProducer2Application : No active profile set, falling back to default profiles: default
2020-02-17 13:15:38.703 INFO 17767 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-02-17 13:15:38.708 INFO 17767 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2020-02-17 13:15:38.715 INFO 17767 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-02-17 13:15:38.777 INFO 17767 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:15:38.787 INFO 17767 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:15:38.790 INFO 17767 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-17 13:15:39.244 INFO 17767 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9010 (http)
2020-02-17 13:15:39.272 INFO 17767 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2020-02-17 13:15:39.273 INFO 17767 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.27]
2020-02-17 13:15:39.386 INFO 17767 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2020-02-17 13:15:39.386 INFO 17767 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 2256 ms
2020-02-17 13:15:39.889 INFO 17767 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2020-02-17 13:15:40.341 INFO 17767 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-02-17 13:15:40.657 INFO 17767 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function '' with acceptedOutputTypes: []
2020-02-17 13:15:40.672 INFO 17767 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'supplier' with acceptedOutputTypes: []
2020-02-17 13:15:40.712 INFO 17767 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'supplier' with acceptedOutputTypes: []
2020-02-17 13:15:40.721 INFO 17767 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'supplier' with acceptedOutputTypes: []
2020-02-17 13:15:40.721 INFO 17767 --- [ main] c.f.c.c.BeanFactoryAwareFunctionRegistry : Looking up function 'supplier' with acceptedOutputTypes: [application/json]
2020-02-17 13:15:41.049 INFO 17767 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2020-02-17 13:15:41.158 INFO 17767 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-02-17 13:15:41.158 INFO 17767 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2020-02-17 13:15:41.159 INFO 17767 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2020-02-17 13:15:41.159 INFO 17767 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {router} as a subscriber to the 'supplier_integrationflow.channel#0' channel
2020-02-17 13:15:41.159 INFO 17767 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.supplier_integrationflow.channel#0' has 1 subscriber(s).
2020-02-17 13:15:41.159 INFO 17767 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'supplier_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-02-17 13:15:41.668 INFO 17767 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: supplier-out-0
2020-02-17 13:15:41.673 INFO 17767 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2020-02-17 13:15:41.762 INFO 17767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
2020-02-17 13:15:41.764 INFO 17767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:15:41.764 INFO 17767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1581941741760
2020-02-17 13:15:41.997 INFO 17767 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2020-02-17 13:15:42.027 INFO 17767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
2020-02-17 13:15:42.028 INFO 17767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fc1aaa116b661c8a
2020-02-17 13:15:42.029 INFO 17767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1581941742027
2020-02-17 13:15:42.041 INFO 17767 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: y8Ab0_tTTd6yxVa9zCYHdQ
2020-02-17 13:15:42.043 INFO 17767 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2020-02-17 13:15:42.065 INFO 17767 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.supplier-out-0' has 1 subscriber(s).
2020-02-17 13:15:42.068 INFO 17767 --- [ main] o.s.i.e.SourcePollingChannelAdapter : started bean 'supplier_integrationflow.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'
and testing with curl
curl -X POST http://localhost:9010/messages
ok, have fun with v2 payload![ec2-user@k8s-master schema-registry-confluent-avro-serializer]$ 2020-02-17 13:16:11.596 ERROR 819 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Failed to invoke method; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"])
at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:115)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"])
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:286)
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertToInternal(ApplicationJsonMessageMarshallingConverter.java:69)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191)
at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83)
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputValueIfNecessary$2(BeanFactoryAwareFunctionRegistry.java:620)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359)
at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:626)
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.doApply(BeanFactoryAwareFunctionRegistry.java:569)
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.get(BeanFactoryAwareFunctionRegistry.java:474)
at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:279)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:263)
at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:112)
... 18 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:394)
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:353)
at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:316)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:729)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2795)
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:270)
... 41 more
Caused by: org.apache.avro.AvroRuntimeException: Not an enum: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]}
at org.apache.avro.Schema.getEnumSymbols(Schema.java:233)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
... 49 more
Just following readme instructions ....
no changed maded
Tx
Your content type is application/json which forces the framework to use ApplicationJsonMessageMarshallingConverter. application/avro perhaps?
Hi,
i dont know.
I just following readme document
Example:
curl -X POST http://localhost:9009/messages
curl -X POST http://localhost:9010/messages
curl -X POST http://localhost:9009/messages
curl -X POST http://localhost:9009/messages
curl -X POST http://localhost:9010/messages

Why can i get the same output like screenshot?
Testing with avro 1.7.7 and 1.9.2 and fails
With pom.xml including
<fieldVisibility>PRIVATE</fieldVisibility> fails
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.4.0</version>
</dependency>
doesn't work
Consumer needs?
spring:
cloud:
stream:
bindings:
input:
destination: sensor-topic
consumer:
useNativeDecoding: true
kafka:
bindings:
output:
content-type: application/*+avro
input:
consumer:
configuration:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
server.port: 9999
and producer?
spring:
cloud:
stream:
bindings:
output:
destination: sensor-topic
producer:
useNativeEncoding: true
kafka:
bindings:
input:
content-type: application/*+avro
output:
producer:
configuration:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
server.port: 9009
fails
Hi, eskuai. I
I think your producer configuration should use a key-serializer instead of a key-deserializer:
Like in this configuration below:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer