kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
geo_point support; questions so I can work toward submitting my PR
Up til now, I have been able to support geo_point with elastic by defining my elastic index myself. However, I have recently come across a domain where I have Avro structures with logical types of decimal where the type is byte[]. If I do not let elastic-sink define the indexes, logical types are not applied (so all of my decimals go into elastic as byte arrays).
So in order for elastic sink to correctly convert decimals into numerical values in Elasticsearch, I have to let it define the indexes. But without support for geo_point, all of my locations go in as text and I have found no way to convert within elastic (I tried to update the index to do a field copy, but copying a text field into a geo point field doesn't seem to be an option).
So, I forked the repo and updated the Mapping code so I could mark a field as a geo_point and have elastic create the index correctly.
I am looking for advice on the best way to approach this so a PR could be accepted, as I think my convention would require updating connect API to add GeoPoint. Also, no-where does it seem that Avro Specification supports custom logical types. So, while it works, it isn't aligned with their spec.
{
"name": "location",
"type": [
"null",
{
"type": "string",
"logicalType": "geopoint",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.GeoPoint"
}
],
"default": null,
"doc": "Geolocation for inventory, pulled up from customer."
}
Highlights from my personal POC getting this working:
private static JsonNode inferLogicalMapping(Schema schema) {
String schemaName = schema.name();
Object defaultValue = schema.defaultValue();
if (schemaName == null) {
return null;
}
switch (schemaName) {
case Date.LOGICAL_NAME:
case Time.LOGICAL_NAME:
case Timestamp.LOGICAL_NAME:
return inferPrimitive(ElasticsearchSinkConnectorConstants.DATE_TYPE, defaultValue);
case Decimal.LOGICAL_NAME:
return inferPrimitive(ElasticsearchSinkConnectorConstants.DOUBLE_TYPE, defaultValue);
case ElasticsearchSinkConnectorConstants.GEOPOINT_LOGICAL_TYPE:
return inferPrimitive(ElasticsearchSinkConnectorConstants.GEOPOINT_TYPE, defaultValue);
default:
// User-defined type or unknown built-in
return null;
}
}
The inferPrimitive() handles geopoint as a string case as that physical type I used for my POC.
@nbuesing to begin with, changing the Connect API is outside of the scope of this connector and all changes need to be made respecting the current API.
That being said, GeoPoint has been a requested feature for a while, so it would be great if we could get some sort of PR out on it. I encourage you to give it a go.
At first glance and if I understand this correctly, your case sounds pretty niche, so I am not sure I see much sense in accommodating it. It might make more sense to restructure your data in a way that will work with the given connector i.e not assigning type byte[]
to the logical types