kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

Support for elastic 8.0

Open pmanvi opened this issue 2 years ago • 25 comments

While running with elastic 8.0 cluster, _bulk API calls fail with following error.

Caused by: java.lang.NullPointerException at java.base/java.util.Objects.requireNonNull(Objects.java:221) at org.elasticsearch.action.DocWriteResponse.(DocWriteResponse.java:127)

Lets know what's the plan

  1. Existing connector compatible with elastic 8.0 (7.x backward compatible)
  2. New connector with newer APIs

pmanvi avatar Feb 24 '22 05:02 pmanvi

Same error here, it doesn't work with ElasticSearch 8.x.

Grabber avatar Mar 17 '22 00:03 Grabber

Is it still an issue with the latest Kafka Connector tag?

zamazan4ik avatar Mar 28 '22 20:03 zamazan4ik

Yes same issue here after upgrade to ES8

erkdahl avatar Apr 04 '22 07:04 erkdahl

Based on Elastic documentation, the code must migrate to the Java API Client: Migrating from the High Level Rest Client However, it is possible to use the latest 7.17 client (current version used here is 7.9.3...) with a compatibility option to activate on the RestHighLevelClient.

fludo avatar Apr 19 '22 08:04 fludo

@fludo Did you managed to get it working with ES 8.x ?

foboni avatar May 26 '22 14:05 foboni

Elasticsearch High Level REST Client 7.16 has a compatibility mode that allows it to work with Elasticsearch 8.x.

This has to be enabled in io.confluent.connect.elasticsearch.ElasticsearchClient (possibly from an additional setting in ElasticsearchSinkConnectorConfig?)

More information in the "Compatibility with Elasticsearch 8.x" section at https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-compatibility.html

Note that the new Java API client doesn't yet have a replacement for the BulkProcessor that is used in ElasticsearchClient, so is currently not suitable for this connector.

swallez avatar May 26 '22 17:05 swallez

@swallez @cjolivier01

Elasticsearch High Level REST Client 7.16 has a compatibility mode that allows it to work with Elasticsearch 8.x.

Without changing any dependencies, I tried changing the following lines in ElasticsearchClient

    this.client = new RestHighLevelClient(
        RestClient
            .builder(
                config.connectionUrls()
                    .stream()
                    .map(HttpHost::create)
                    .collect(toList())
                    .toArray(new HttpHost[config.connectionUrls().size()])
            )
            .setHttpClientConfigCallback(configCallbackHandler)
    );

with

    RestClient httpClient = RestClient.builder(
          config.connectionUrls()
              .stream()
              .map(HttpHost::create)
              .collect(toList())
              .toArray(new HttpHost[config.connectionUrls().size()])
        ).setHttpClientConfigCallback(configCallbackHandler).build();

    this.client = new RestHighLevelClientBuilder(httpClient).setApiCompatibilityMode(true).build();

But that didn't work. Any idea why ?

Connector Version: 13.0.0 Elasticsearch Version: 8.0.1

foboni avatar May 27 '22 14:05 foboni

Getting the following stacktrace when running version 13.0.0 of the connector against Elastic 8.2.0 via docker-compose:

[2022-05-27 13:22:38,402] ERROR WorkerSinkTask{id=Elasticsearch_Sink_Connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed
        at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:398)
        at org.elasticsearch.action.bulk.BulkRequestHandler$1.onFailure(BulkRequestHandler.java:64)
        at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
        at org.elasticsearch.action.ActionListener$RunAfterActionListener.onFailure(ActionListener.java:350)
        at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
        at org.elasticsearch.action.bulk.Retry$RetryHandler.onFailure(Retry.java:123)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:175)
        ... 5 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to execute bulk request due to 'java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://elasticsearch:9200, response=HTTP/1.1 200 OK}' after 6 attempt(s)
        at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:165)
        at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:119)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:426)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:169)
        ... 5 more
Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://elasticsearch:9200, response=HTTP/1.1 200 OK}
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2190)
        at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
        at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
        at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:620)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:171)
        at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:158)
        ... 8 more
Caused by: java.lang.NullPointerException
        at java.base/java.util.Objects.requireNonNull(Objects.java:221)
        at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:116)
        at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:43)
        at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:28)
        at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:96)
        at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:93)
        at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:148)
        at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:184)
        at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
        at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:2105)
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2188)

jimbethancourt avatar May 27 '22 14:05 jimbethancourt

@jimbethancourt This error is because of the compatibility issues.

foboni avatar May 30 '22 04:05 foboni

HTTP Logs intercepted from ES Client node

pom.xml

  <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>7.17.4</version>
  </dependency>
  <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
      <version>7.17.4</version>
  </dependency>

I also enabled compatibility mode by modifying RestHighLevelClientBuilder

GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

HEAD / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549


GET / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

HEAD / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549


GET / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}


GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

GET / HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

HEAD /test_index HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 659


POST /_bulk?timeout=1m HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 215616
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)

...data....

HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 65152

{"took":16,"ingest_took":3,"errors":false,....

POST /_bulk?timeout=1m HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 215616
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)

...data...

HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 65152

Based on the logs, it is clear that connector is sending compatibility headers in its requests and server is responding based on the headers.

Still connector fails throwing same error Unable to parse response body for Response

foboni avatar May 30 '22 14:05 foboni

@kpatelatwork @snehashisp

HTTP Logs intercepted from ES Client node

pom.xml

  <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>7.17.4</version>
  </dependency>
  <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
      <version>7.17.4</version>
  </dependency>

I also enabled compatibility mode by modifying RestHighLevelClientBuilder

GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

HEAD / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549


GET / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

HEAD / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549


GET / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}


GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

GET / HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549

{
  "name" : "elk-client",
  "cluster_name" : "es-cluster",
  "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
  "version" : {
    "number" : "8.0.1",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
    "build_date" : "2022-02-24T13:55:40.601285296Z",
    "build_snapshot" : false,
    "lucene_version" : "9.0.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

HEAD /test_index HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)


HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 659


POST /_bulk?timeout=1m HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 215616
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)

...data....

HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 65152

{"took":16,"ingest_took":3,"errors":false,....

POST /_bulk?timeout=1m HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 215616
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)

...data...

HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 65152

Based on the logs, it is clear that connector is sending compatibility headers in its requests and server is responding based on the headers.

Still connector fails throwing same error Unable to parse response body for Response

foboni avatar Jun 01 '22 12:06 foboni

@foboni Thanks for reporting this, we are discussing internally how to prioritize 8.0 support.

copying @rajat-agarwal

kpatelatwork avatar Jun 01 '22 14:06 kpatelatwork

According to the docs,

The HLRC version 7.17 can be used with Elasticsearch version 8.x by enabling HLRC’s compatibility mode (see code sample below). In this mode HLRC sends additional headers that instruct Elasticsearch 8.x to behave like a 7.x server.

I'm using 7.17 and have enabled compatibility mode. Even then it fails throwing the same error Unable to parse response body for Response.

@kpatelatwork Will there be any workaround for this before we migrate to new Java Client API ?

Also do we have any docs available with instructions on how to build this connector from source ? Currently I'm not able to run mvn clean install as it fails in several test cases. But mvn clean package is passing the build.

foboni avatar Jun 02 '22 04:06 foboni

@foboni we have asked @snehashisp to evaluate this and we will get back to you as we know more.

kpatelatwork avatar Jun 02 '22 17:06 kpatelatwork

@foboni I was able to get this to work with es client version 7.17.3 in compatibility mode, to es 8.1.1 running locally. The es client was running without any security but that should not matter for functional compatibility. The changes are the same as what you had made to the client but you can pull the es-8 branch and mvn clean package for a quick build.

snehashisp avatar Jun 06 '22 11:06 snehashisp

Thanks @snehashisp

It worked with 8.1.1 But not sure why it didn't work with 8.0.1

foboni avatar Jun 06 '22 13:06 foboni

@snehashisp what is the anticipated timeline for merging changes from the es-8 branch into master? Thanks!

jimbethancourt avatar Jun 06 '22 16:06 jimbethancourt

I got the following error when invoke the mvn clean package command for es-8 branch. Please help me to fix this issues

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check (validate) on project kafka-connect-elasticsearch: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3: Failed to read artifact descriptor for io.confluent:build-tools:jar:6.2.3: Could not transfer artifact io.confluent:build-tools:pom:6.2.3 from/to maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: [confluent (http://packages.confluent.io/maven/, default, releases+snapshots)] -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check (validate) on project kafka-connect-elasticsearch: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3 at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:306) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:211) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:165) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:157) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:121) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:127) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192) at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105) at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960) at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293) at org.apache.maven.cli.MavenCli.main (MavenCli.java:196) at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:498) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282) at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406) at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347) Caused by: org.apache.maven.plugin.PluginExecutionException: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3 at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:109)

Hi guys, do you have an update?

electriquo avatar Jul 28 '22 10:07 electriquo

Hi guys, do you have an update?

ES 8.x support should be available sometime late in Q4.

There is an pull request opening for this deal.

Sup3r-Us3r avatar Jul 29 '22 00:07 Sup3r-Us3r

I got the following error when invoke the mvn clean package command for es-8 branch. Please help me to fix this issues

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check (validate) on project kafka-connect-elasticsearch: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3: Failed to read artifact descriptor for io.confluent:build-tools:jar:6.2.3: Could not transfer artifact io.confluent:build-tools:pom:6.2.3 from/to maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: [confluent (http://packages.confluent.io/maven/, default, releases+snapshots)] -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check (validate) on project kafka-connect-elasticsearch: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3 at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:306) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:211) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:165) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:157) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:121) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:127) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192) at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105) at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960) at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293) at org.apache.maven.cli.MavenCli.main (MavenCli.java:196) at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:498) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282) at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406) at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347) Caused by: org.apache.maven.plugin.PluginExecutionException: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3 at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:109)

Try to build with maven:3.6-openjdk-8

kafka-connect-elasticsearch git:(es-8) ✗  docker run --rm -v "$(pwd)":/opt/maven -e "-Dmaven.repo.local=/opt/maven/.m2/repository" -w /opt/maven maven:3.6-openjdk-8 mvn clean package -Dmaven.repo.local=/opt/maven/.m2/repository

It works with ES 8.3.2

gricuk avatar Aug 02 '22 09:08 gricuk

Hi guys, do you have an update?

ES 8.x support should be available sometime late in Q4.

There is an pull request opening for this deal.

I see that the pull request has been merged a couple of weeks ago. Is there any ETA for the official version to be released?

samcarpentier avatar Jan 05 '23 16:01 samcarpentier

Still broken for me.

[2023-12-19 20:01:06,561] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed
	at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:443)
	at org.elasticsearch.action.bulk.BulkRequestHandler$1.onFailure(BulkRequestHandler.java:64)
	at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
	at org.elasticsearch.action.ActionListener$RunAfterActionListener.onFailure(ActionListener.java:350)
	at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
	at org.elasticsearch.action.bulk.Retry$RetryHandler.onFailure(Retry.java:123)
	at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:216)
	... 5 more

ES 8.32 Kafka-connect-elasticsearch v14.0.12

suityou01 avatar Dec 19 '23 20:12 suityou01

This line requires the user has the "monitoring_user" role or equivalent added to your es connector user.

curl --cacert /tmp/ca.crt -u elastic:kb4gtb4NaAjbYsnmwwBK -X POST "https://localhost:9200/_security/user/es_sink_connector_user?pretty" -H 'Content-Type: application/json' -d'
{
  "password" : "c6HHxnucdmvUHUTEKb7q",
  "roles" : [ "es_sink_connector_role", "monitoring_user" ]
}'

I think the documentation needs updating accordingly for 8.x users

suityou01 avatar Dec 20 '23 10:12 suityou01

Nice find @suityou01 - this worked for me as well. I looked around but looks like we can't PR the docs. Other code snippet for context.

theblinkingusb avatar Jan 11 '24 01:01 theblinkingusb