kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

Option to disable the auto generated document ID's

Open panda87 opened this issue 7 years ago • 3 comments

Hi

We started to use this connector in production, before that we used Logstash / Filebeat without manually creating document ID's.

After we started to use this connector, we see very high usage in CPU time(load) and disk IOPS. We do understand the concepts behind this connector which one of them is the exactly once guarantees, but I think this will be useful to let the client decide if they want to use this feature or not.

Can you pls add a flag that will let ES generates it's ID's instead of the connector?

Thanks D

panda87 avatar Nov 01 '17 21:11 panda87

+1. Will gladly send a PR if this has any chance of making that in.

synhershko avatar Jan 14 '18 09:01 synhershko

I was running into this same problem. I was able to figure out a simple fix to the code and after making it I did notice a significant overall performance improvement. I'll share the code change I made which was only in one file in case someone else is interested. Change to io.confluent.connect.elasticsearch.DataConverter.convertRecord()

Replaced line 161: id = convertKey(record.keySchema(), record.key()); with: // Add support for auto generating ids in Elasticsearch. // If the provide doesn't provide us a key and doesn't want to ignore that fact, // grant their wish and don't set _id and let Elasticsearch auto generate it. if (record.key() == null) { id = null; } else { id = convertKey(record.keySchema(), record.key()); }

Replaced return statement: return new IndexableRecord(new Key(index, type, id), payload, version); with: return new IndexableRecord(new Key(index, type, id), payload, (id == null ? null : version));

I did also comment a couple unit test cases in ElasticsearchWriterTest.java which were no longer valid since they were expecting a failure when the record key was null.

kelbyloden avatar Feb 08 '19 14:02 kelbyloden

First you should set "key.ignore":"false" and then define a transformation. As well as ValueToKey you need ExtractField to convert the key from an object to a plain field: transforms=InsertKey,ExtractId transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey transforms.InsertKey.fields=id_field
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.ExtractId.field=id_field

shenzgang avatar Jul 14 '20 09:07 shenzgang