ksql icon indicating copy to clipboard operation
ksql copied to clipboard

Insert Into queries fail when source record map is missing keys that are inserted into target record map for protobuf schema

Open SamiShaikh opened this issue 2 years ago • 2 comments

Describe the bug Insert Into queries fail when source record map is missing keys that are inserted into target record map for protobuf schema The query throws NPE

To Reproduce Run the following queries CREATE STREAM TEST123 (ID STRING KEY, MYMAP MAP<STRING, STRING>) WITH (KAFKA_TOPIC='maptest', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='PROTOBUF'); create stream toStr2(id string key, mystruct struct<id string, mymap map<string,string>>) with (kafka_topic='output2',value_format='protobuf',partitions=1); insert into tostr2 select id, struct(id := s.myMap['key0'], mymap:= map('key1' :=s.myMap['key1'], 'key2':=s.myMap['key2'])) myStruct from test123 s emit changes;

Any record without key1 and key2 throws NPE `ksql> insert into test123(id, myMap) values('1', MAP('key1':='value1')); --throws NPE

ksql> insert into test123(id, myMap) values('1', MAP('key1':='value1', 'key2':='value2')); --works`

Expected behavior Query is expected to handle null

Actual behaviour Exception in thread "_confluent-ksql-71ksqlquery_INSERTQUERY_55-642e8040-db8e-444e-ab3e-1e869bda32a9-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic output3 for task 0_0 due to: org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: output3. null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:177) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:61) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:61) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1296) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576) Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: output3. null at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56) at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37) at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:186) at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:149) at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62) at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47) at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:157) ... 27 more Caused by: java.lang.NullPointerException at com.google.protobuf.DynamicMessage$Builder.setField(DynamicMessage.java:540) at io.confluent.connect.protobuf.ProtobufData.fromConnectData(ProtobufData.java:460) at io.confluent.connect.protobuf.ProtobufData.fromConnectData(ProtobufData.java:494) at io.confluent.connect.protobuf.ProtobufData.fromConnectData(ProtobufData.java:337) at io.confluent.connect.protobuf.ProtobufConverter.fromConnectData(ProtobufConverter.java:85) at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:53) ... 34 more

Additional context Insert fails even if I move the map outside struct(doesn't matter with nested or non-nested) Works fine if I add a null check like below insert into tostr select id, struct(id := s.myMap['key0'],mymap:= map('key1' := ifnull(s.myMap['key1'],''), 'key2':=IFNULL(s.myMap['key2'],''))) myStruct from test123 s emit changes;

SamiShaikh avatar Jul 21 '22 07:07 SamiShaikh

@SamiShaikh protobuf doesn't support nulls, so this is somewhat expected behavior. Perhaps we can make the error message more instructive though!

agavra avatar Jul 21 '22 21:07 agavra

protobuf doesn't support nulls, so this is somewhat expected behavior. Perhaps we can make the error message more instructive though!

@agavra Since the issue is solved, improving the error message is not needed any more.

aliehsaeedii avatar Sep 09 '22 09:09 aliehsaeedii