snowflake-kafka-connector icon indicating copy to clipboard operation
snowflake-kafka-connector copied to clipboard

Error Code: 5016 SinkRecord.value and SinkRecord.valueSchema cannot be null

Open alexnikitchuk opened this issue 2 years ago • 7 comments

According to docs default setting of behavior.on.null.values is DEFAULT which means when the Kafka connector encounters a tombstone record, it inserts an empty JSON string in the content column. But in our case we receive the error instead (see below). We use protobuf serialization. Sink connector v.1.5.5 Can someone clarify please?

[SF_KAFKA_CONNECTOR] Exception: Invalid SinkRecord received
[SF_KAFKA_CONNECTOR] Error Code: 5016
[SF_KAFKA_CONNECTOR] Detail: SinkRecord.value and SinkRecord.valueSchema cannot be null (org.apache.kafka.connect.runtime.WorkerSinkTask)
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Invalid SinkRecord received
[SF_KAFKA_CONNECTOR] Error Code: 5016
[SF_KAFKA_CONNECTOR] Detail: SinkRecord.value and SinkRecord.valueSchema cannot be null
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:281)
	at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:248)
	at com.snowflake.kafka.connector.records.RecordService.processRecord(RecordService.java:91)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext$PartitionBuffer.insert(SnowflakeSinkServiceV1.java:982)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext$PartitionBuffer.access$700(SnowflakeSinkServiceV1.java:950)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:591)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$300(SnowflakeSinkServiceV1.java:335)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:180)
	at com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:91)
	at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

alexnikitchuk avatar Mar 25 '22 11:03 alexnikitchuk

The same issue in v.1.7.1

alexnikitchuk avatar Mar 26 '22 13:03 alexnikitchuk

@alexnikitchuk could you share your config settings for the converters?

@sfc-gh-japatel any idea about this?

sfc-gh-tzhang avatar Mar 30 '22 00:03 sfc-gh-tzhang

I've found the root cause: there is a note Use this property with the Snowflake converters only in docs. But we use Confluent ProtobufConverter. @sfc-gh-tzhang is there any plans to add SnowflakeProtobufConverter ?

alexnikitchuk avatar Mar 30 '22 06:03 alexnikitchuk

Thanks @alexnikitchuk for confirmation, there are no plans for SnowflakeProtobufConverter, since it is confluent converter, can you try https://docs.confluent.io/platform/current/connect/transforms/tombstonehandler.html SMT would work for non snowflake converters.

sfc-gh-japatel avatar Mar 30 '22 17:03 sfc-gh-japatel

@sfc-gh-japatel I've tried it but the only option there is to ignore tombstone completely.

alexnikitchuk avatar Mar 31 '22 11:03 alexnikitchuk

Did someone find the solution to fix this issue? I am also facing the same problem when upgraded the cp-kafka-connect-base. to latest. I do not had any issue with cp-kafka-connect-base:5.5 version. I am using json converter

rishisoft1 avatar Apr 18 '23 10:04 rishisoft1

This should filter tombstones and fix the issue:

"transforms": "FilterEmptyMessages",
"transforms.FilterEmptyMessages.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.FilterEmptyMessages.predicate": "FilterEmpty",
"predicates": "FilterEmpty",
"predicates.FilterEmpty.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"

vascoferraz avatar Nov 16 '23 12:11 vascoferraz