snowflake-kafka-connector
snowflake-kafka-connector copied to clipboard
Error Code: 5016 SinkRecord.value and SinkRecord.valueSchema cannot be null
According to docs default setting of behavior.on.null.values
is DEFAULT
which means when the Kafka connector encounters a tombstone record, it inserts an empty JSON string in the content column.
But in our case we receive the error instead (see below).
We use protobuf serialization.
Sink connector v.1.5.5
Can someone clarify please?
[SF_KAFKA_CONNECTOR] Exception: Invalid SinkRecord received
[SF_KAFKA_CONNECTOR] Error Code: 5016
[SF_KAFKA_CONNECTOR] Detail: SinkRecord.value and SinkRecord.valueSchema cannot be null (org.apache.kafka.connect.runtime.WorkerSinkTask)
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Invalid SinkRecord received
[SF_KAFKA_CONNECTOR] Error Code: 5016
[SF_KAFKA_CONNECTOR] Detail: SinkRecord.value and SinkRecord.valueSchema cannot be null
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:281)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:248)
at com.snowflake.kafka.connector.records.RecordService.processRecord(RecordService.java:91)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext$PartitionBuffer.insert(SnowflakeSinkServiceV1.java:982)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext$PartitionBuffer.access$700(SnowflakeSinkServiceV1.java:950)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:591)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$300(SnowflakeSinkServiceV1.java:335)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:180)
at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:91)
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
The same issue in v.1.7.1
@alexnikitchuk could you share your config settings for the converters?
@sfc-gh-japatel any idea about this?
I've found the root cause: there is a note Use this property with the Snowflake converters only
in docs. But we use Confluent ProtobufConverter.
@sfc-gh-tzhang is there any plans to add SnowflakeProtobufConverter
?
Thanks @alexnikitchuk for confirmation, there are no plans for SnowflakeProtobufConverter, since it is confluent converter, can you try https://docs.confluent.io/platform/current/connect/transforms/tombstonehandler.html SMT would work for non snowflake converters.
@sfc-gh-japatel I've tried it but the only option there is to ignore tombstone completely.
Did someone find the solution to fix this issue? I am also facing the same problem when upgraded the cp-kafka-connect-base. to latest. I do not had any issue with cp-kafka-connect-base:5.5 version. I am using json converter
This should filter tombstones and fix the issue:
"transforms": "FilterEmptyMessages",
"transforms.FilterEmptyMessages.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.FilterEmptyMessages.predicate": "FilterEmpty",
"predicates": "FilterEmpty",
"predicates.FilterEmpty.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"