gcs-connector-for-apache-kafka
gcs-connector-for-apache-kafka copied to clipboard
[avro workaround] Pulling avro data but using an SMT to transform avro to json
Hi I do know I have raised an issue for availability to send avro data to GCS using the GCS sink connector. I am trying a workaround but i am getting a strange error so wanted to see if you guys can throw some light into the issue.
The way I understand is the converters are separate from connectors and on the consumer side, the data goes through the deserialiser ---> SMT ---> connector (GCS sink connector) and then the external system (GCS)
With the above in mind, I am trying to still use the avro deserialiser in the connector settings, so that the data can be properly deserialised and then have an SMT that converts the records into a json object, which the GCS sink connector can understand. However, at the deserialisation level itself, i get the following error.
Do you if somehow the connector interacts with the converter in some way that could lead to this outcome ?
PS - If I were to use a big query sink converter, it just works with the same avro converter settings.
2023-01-25 16:37:28,737 ERROR [cloudsql-proddb-gcs|task-0] WorkerSinkTask{id=cloudsql-proddb-gcs-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-cloudsql-proddb-gcs-0]
... 20 more
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:192)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:119)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:408)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:415)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:334)
at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1613)
at java.base/sun.net.www.protocol.http.HttpURLConnection.writeRequests(HttpURLConnection.java:730)
at java.base/sun.net.www.protocol.http.HttpURLConnection.writeRequests(HttpURLConnection.java:718)
Caused by: java.io.IOException: Error writing to server
... 17 more
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:88)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:140)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:243)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:183)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 139
... 13 more
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:101)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic proddb.public.egg to Avro:
at java.base/java.lang.Thread.run(Thread.java:829)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
Jan 25, 2023 4:37:21 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
Jan 25, 2023 4:37:21 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be ignored.
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored.
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored.
Jan 25, 2023 4:37:21 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
Jan 25, 2023 4:37:21 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
Jan 25, 2023 4:37:21 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.RootResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored.
Configuring Java heap: -Xms3629247365 -Xmx3629247365
+ exec /usr/bin/tini -w -e 143 -- /opt/kafka/bin/connect-distributed.sh /tmp/strimzi-connect.properties
consumer.sasl.jaas.config=[hidden]
consumer.sasl.mechanism=SCRAM-SHA-256