kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

ES Ingest Pipeline support

Open mtak opened this issue 7 years ago • 13 comments

Elasticsearch has support for Ingest Pipelines to perform transformations on data before it is indexed. The indexing API requires a parameter to be passed to use a pipeline:

PUT /myindex/type/1?pipeline=monthlyindex
{
  "date1" : "2016-04-25T12:02:01.789Z"
}

Would it be possible to add this as an option to the Kafka Connect Elasticsearch connector?

mtak avatar Mar 09 '17 15:03 mtak

I think the issue is mostly because of Jest for now, as it does not support that.

evilezh avatar May 27 '17 00:05 evilezh

Hello, we also need this feature. Since this elasticsearch connector is based on the Bulk API (https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html), maybe it's possible to add an option in the configuration file to pass user defined url parameters ?

For example, while putting the data to elasticsearch, it will call http://<elasticsearh_url>/_bulk?pipeline=some_pipeline, where the parameters come from an option defined in the properties file: bulk.url.parameters=?pipeline=some_pipeline. By default, this option should be empty.

This is the basic idea, then maybe the parameters should be better defined with a map key-value pairs.

yuancheng-p avatar Jul 11 '17 14:07 yuancheng-p

Is there any update on this enhancement?

pratikshya avatar Oct 05 '17 05:10 pratikshya

Is any update of this feature? We need it in our production environment too, thanks.

amonhuang avatar Dec 20 '18 06:12 amonhuang

Any updates on this issue?

Nayruden avatar Jun 12 '19 19:06 Nayruden

I see few commits on this issue and also code that provide pipeline config parameter. Does it work? I'm trying to specify pipeline in my sink conf file but nothing happens.

zboinek avatar Feb 03 '20 16:02 zboinek

We also require this option in production. The only other viable workaround is to ditch this connector and use Logstash to consume messages from Apache Kafka, adding another potential point of failure and making our Elasticsearch deployment more complicated on scale.

Pipelines are an integral part of processing data using Elasticsearch, please consider merging the branches above.

taythebot avatar Mar 06 '20 09:03 taythebot

I see few commits on this issue and also code that provide pipeline config parameter. Does it work? I'm trying to specify pipeline in my sink conf file but nothing happens.

There was a merge request (almost a year ago) but seems like master doesn't contain any related changes in config code: ElasticsearchSinkConnectorConfig.java

And it's strange because this feature is requested by many users for a long time.

pavelpe avatar Mar 09 '20 09:03 pavelpe

I'm also looking for this support as it's going to dramatically improve the ElasticSearch queries that we are currently running.

briward avatar Mar 25 '20 10:03 briward

Why would passing the pipeline as a parameter be the desired approach rather than setting index.default_pipeline on the ES side of things?

windowsrefund avatar Sep 17 '20 16:09 windowsrefund

Why would passing the pipeline as a parameter be the desired approach rather than setting index.default_pipeline on the ES side of things?

You might need to apply different pipeline based on the logs type you're trying to index. If i have, say, a topic "es-logs", which then i use to index documents in the index "es-prod-logs" through the connector and it contains both system and apache logs i would like to be able to get Apache logs parsed by the default filebeat apache pipeline, meanwhile System logs are getting parsed by the default filebeat syslog system pipeline.

That would be useful as Filebeat has a metadata field which specifies the ingest pipeline to use (https://www.elastic.co/guide/en/logstash/current/use-ingest-pipelines.html)

As a workaround you might want to have filebeat conditionally sending logs to a different topic. If logs are from Apache folder logs then send them to the "elk_apache_logs" topic, which will then send them to the "elk_apache_logs" index which has the "index.default_pipeline" setting set to the Filebeat Apache Pipeline. Same would apply for System logs and so on.

agi0rgi avatar Sep 17 '20 19:09 agi0rgi

Given that starting point, I can see the use case.

windowsrefund avatar Sep 17 '20 19:09 windowsrefund

The lack of this feature is a deciding factor in use of Logstash over kafka-connect-elasticsearch.

As mentioned by agi0rgi, being able to apply a pipeline at runtime based on the topic, key, or message attributes is a common need for ingest processes.

brsolomon-deloitte avatar Apr 13 '22 13:04 brsolomon-deloitte