kafka-connect-storage-common
kafka-connect-storage-common copied to clipboard
RecordField w/ timestamp.field should support nested timestamps inside maps
Similarly to https://github.com/confluentinc/kafka-connect-storage-common/issues/47 I have a nested timestamp field. However it's nested inside a map, for example, like this:
Schema schema = SchemaBuilder.struct()
.field("headers", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
.field("data", Schema.BYTES_SCHEMA)
.build();
So, when using headers.timestamp it fails with:
org.apache.kafka.connect.errors.DataException: Unable to get field 'headers.timestamp' from schema Schema{STRUCT}.
at io.confluent.connect.storage.util.DataUtils.getNestedField(DataUtils.java:87)
at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:282)
at io.confluent.connect.storage.partitioner.TimeBasedPartitioner.encodePartition(TimeBasedPartitioner.java:162)
at io.confluent.connect.storage.partitioner.TimeBasedPartitionerTest.testNestedRecordFieldTimeExtractor2(TimeBasedPartitionerTest.java:553)
...
Caused by: org.apache.kafka.connect.errors.DataException: Cannot look up fields on non-struct type
at org.apache.kafka.connect.data.SchemaBuilder.field(SchemaBuilder.java:349)
at io.confluent.connect.storage.util.DataUtils.getNestedField(DataUtils.java:82)
... 28 more
Reason: DataUtils.getNestedField doesn't expect Maps, only Structs. It's possible to create a similar helper method to return Schema instead of Field and adjust it to check valueSchema() first. Any thoughts? Happy to prepare a PR.
~It ~does~ should support maps... getNestedField calls getField, which checks for Maps ~
~https://github.com/confluentinc/kafka-connect-storage-common/blob/master/core/src/main/java/io/confluent/connect/storage/util/DataUtils.java#L36~
EDIT Oh... I see. A Struct containing a map. Where the current implementation expects nested Structs only. My bad!
@cricket007 Thanks for the comments on https://github.com/sap1ens/kafka-connect-storage-common/commit/66b2af7dc30f2b178412a17a0d4084dfa064bd20#commitcomment-35110823! Do you want me to address your feedback and open a PR here? Thanks
Yes please 😊