kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
ES Ingest Pipeline support
Elasticsearch has support for Ingest Pipelines to perform transformations on data before it is indexed. The indexing API requires a parameter to be passed to use a pipeline:
PUT /myindex/type/1?pipeline=monthlyindex
{
"date1" : "2016-04-25T12:02:01.789Z"
}
Would it be possible to add this as an option to the Kafka Connect Elasticsearch connector?
I think the issue is mostly because of Jest for now, as it does not support that.
Hello, we also need this feature. Since this elasticsearch connector is based on the Bulk API (https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html), maybe it's possible to add an option in the configuration file to pass user defined url parameters ?
For example, while putting the data to elasticsearch, it will call http://<elasticsearh_url>/_bulk?pipeline=some_pipeline
, where the parameters come from an option defined in the properties file: bulk.url.parameters=?pipeline=some_pipeline
. By default, this option should be empty.
This is the basic idea, then maybe the parameters should be better defined with a map key-value pairs.
Is there any update on this enhancement?
Is any update of this feature? We need it in our production environment too, thanks.
Any updates on this issue?
I see few commits on this issue and also code that provide pipeline config parameter. Does it work? I'm trying to specify pipeline in my sink conf file but nothing happens.
We also require this option in production. The only other viable workaround is to ditch this connector and use Logstash to consume messages from Apache Kafka, adding another potential point of failure and making our Elasticsearch deployment more complicated on scale.
Pipelines are an integral part of processing data using Elasticsearch, please consider merging the branches above.
I see few commits on this issue and also code that provide pipeline config parameter. Does it work? I'm trying to specify pipeline in my sink conf file but nothing happens.
There was a merge request (almost a year ago) but seems like master doesn't contain any related changes in config code: ElasticsearchSinkConnectorConfig.java
And it's strange because this feature is requested by many users for a long time.
I'm also looking for this support as it's going to dramatically improve the ElasticSearch queries that we are currently running.
Why would passing the pipeline as a parameter be the desired approach rather than setting index.default_pipeline on the ES side of things?
Why would passing the pipeline as a parameter be the desired approach rather than setting index.default_pipeline on the ES side of things?
You might need to apply different pipeline based on the logs type you're trying to index. If i have, say, a topic "es-logs", which then i use to index documents in the index "es-prod-logs" through the connector and it contains both system and apache logs i would like to be able to get Apache logs parsed by the default filebeat apache pipeline, meanwhile System logs are getting parsed by the default filebeat syslog system pipeline.
That would be useful as Filebeat has a metadata field which specifies the ingest pipeline to use (https://www.elastic.co/guide/en/logstash/current/use-ingest-pipelines.html)
As a workaround you might want to have filebeat conditionally sending logs to a different topic. If logs are from Apache folder logs then send them to the "elk_apache_logs" topic, which will then send them to the "elk_apache_logs" index which has the "index.default_pipeline" setting set to the Filebeat Apache Pipeline. Same would apply for System logs and so on.
Given that starting point, I can see the use case.
The lack of this feature is a deciding factor in use of Logstash over kafka-connect-elasticsearch.
As mentioned by agi0rgi, being able to apply a pipeline at runtime based on the topic, key, or message attributes is a common need for ingest processes.