kafka-connect-storage-common icon indicating copy to clipboard operation
kafka-connect-storage-common copied to clipboard

RecordField w/ timestamp.field should support nested timestamps inside maps

Open sap1ens opened this issue 6 years ago • 3 comments

Similarly to https://github.com/confluentinc/kafka-connect-storage-common/issues/47 I have a nested timestamp field. However it's nested inside a map, for example, like this:

Schema schema = SchemaBuilder.struct()
    .field("headers", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
    .field("data", Schema.BYTES_SCHEMA)
    .build();

So, when using headers.timestamp it fails with:

org.apache.kafka.connect.errors.DataException: Unable to get field 'headers.timestamp' from schema Schema{STRUCT}.

	at io.confluent.connect.storage.util.DataUtils.getNestedField(DataUtils.java:87)
	at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:282)
	at io.confluent.connect.storage.partitioner.TimeBasedPartitioner.encodePartition(TimeBasedPartitioner.java:162)
	at io.confluent.connect.storage.partitioner.TimeBasedPartitionerTest.testNestedRecordFieldTimeExtractor2(TimeBasedPartitionerTest.java:553)
        ...
Caused by: org.apache.kafka.connect.errors.DataException: Cannot look up fields on non-struct type
	at org.apache.kafka.connect.data.SchemaBuilder.field(SchemaBuilder.java:349)
	at io.confluent.connect.storage.util.DataUtils.getNestedField(DataUtils.java:82)
	... 28 more

Reason: DataUtils.getNestedField doesn't expect Maps, only Structs. It's possible to create a similar helper method to return Schema instead of Field and adjust it to check valueSchema() first. Any thoughts? Happy to prepare a PR.

sap1ens avatar Jun 24 '19 01:06 sap1ens

~It ~does~ should support maps... getNestedField calls getField, which checks for Maps ~

~https://github.com/confluentinc/kafka-connect-storage-common/blob/master/core/src/main/java/io/confluent/connect/storage/util/DataUtils.java#L36~

EDIT Oh... I see. A Struct containing a map. Where the current implementation expects nested Structs only. My bad!

OneCricketeer avatar Sep 17 '19 07:09 OneCricketeer

@cricket007 Thanks for the comments on https://github.com/sap1ens/kafka-connect-storage-common/commit/66b2af7dc30f2b178412a17a0d4084dfa064bd20#commitcomment-35110823! Do you want me to address your feedback and open a PR here? Thanks

sap1ens avatar Sep 19 '19 05:09 sap1ens

Yes please 😊

OneCricketeer avatar Sep 19 '19 12:09 OneCricketeer