ksql
ksql copied to clipboard
Insert Into queries fail when source record map is missing keys that are inserted into target record map for protobuf schema
Describe the bug Insert Into queries fail when source record map is missing keys that are inserted into target record map for protobuf schema The query throws NPE
To Reproduce
Run the following queries
CREATE STREAM TEST123 (ID STRING KEY, MYMAP MAP<STRING, STRING>) WITH (KAFKA_TOPIC='maptest', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='PROTOBUF'); create stream toStr2(id string key, mystruct struct<id string, mymap map<string,string>>) with (kafka_topic='output2',value_format='protobuf',partitions=1); insert into tostr2 select id, struct(id := s.myMap['key0'], mymap:= map('key1' :=s.myMap['key1'], 'key2':=s.myMap['key2'])) myStruct from test123 s emit changes;
Any record without key1 and key2 throws NPE `ksql> insert into test123(id, myMap) values('1', MAP('key1':='value1')); --throws NPE
ksql> insert into test123(id, myMap) values('1', MAP('key1':='value1', 'key2':='value2')); --works`
Expected behavior Query is expected to handle null
Actual behaviour
Exception in thread "_confluent-ksql-71ksqlquery_INSERTQUERY_55-642e8040-db8e-444e-ab3e-1e869bda32a9-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic output3 for task 0_0 due to: org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: output3. null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:177) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:61) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:61) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1296) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576) Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: output3. null at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56) at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37) at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:186) at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:149) at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62) at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47) at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:157) ... 27 more Caused by: java.lang.NullPointerException at com.google.protobuf.DynamicMessage$Builder.setField(DynamicMessage.java:540) at io.confluent.connect.protobuf.ProtobufData.fromConnectData(ProtobufData.java:460) at io.confluent.connect.protobuf.ProtobufData.fromConnectData(ProtobufData.java:494) at io.confluent.connect.protobuf.ProtobufData.fromConnectData(ProtobufData.java:337) at io.confluent.connect.protobuf.ProtobufConverter.fromConnectData(ProtobufConverter.java:85) at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:53) ... 34 more
Additional context
Insert fails even if I move the map outside struct(doesn't matter with nested or non-nested)
Works fine if I add a null check like below
insert into tostr select id, struct(id := s.myMap['key0'],mymap:= map('key1' := ifnull(s.myMap['key1'],''), 'key2':=IFNULL(s.myMap['key2'],''))) myStruct from test123 s emit changes;
@SamiShaikh protobuf doesn't support nulls, so this is somewhat expected behavior. Perhaps we can make the error message more instructive though!
protobuf doesn't support nulls, so this is somewhat expected behavior. Perhaps we can make the error message more instructive though!
@agavra Since the issue is solved, improving the error message is not needed any more.