kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
Support for elastic 8.0
While running with elastic 8.0 cluster, _bulk API calls fail with following error.
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at org.elasticsearch.action.DocWriteResponse.
Lets know what's the plan
- Existing connector compatible with elastic 8.0 (7.x backward compatible)
- New connector with newer APIs
Same error here, it doesn't work with ElasticSearch 8.x.
Is it still an issue with the latest Kafka Connector tag?
Yes same issue here after upgrade to ES8
Based on Elastic documentation, the code must migrate to the Java API Client: Migrating from the High Level Rest Client However, it is possible to use the latest 7.17 client (current version used here is 7.9.3...) with a compatibility option to activate on the RestHighLevelClient.
@fludo Did you managed to get it working with ES 8.x ?
Elasticsearch High Level REST Client 7.16 has a compatibility mode that allows it to work with Elasticsearch 8.x.
This has to be enabled in io.confluent.connect.elasticsearch.ElasticsearchClient
(possibly from an additional setting in ElasticsearchSinkConnectorConfig
?)
More information in the "Compatibility with Elasticsearch 8.x" section at https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-compatibility.html
Note that the new Java API client doesn't yet have a replacement for the BulkProcessor
that is used in ElasticsearchClient
, so is currently not suitable for this connector.
@swallez @cjolivier01
Elasticsearch High Level REST Client 7.16 has a compatibility mode that allows it to work with Elasticsearch 8.x.
Without changing any dependencies, I tried changing the following lines in ElasticsearchClient
this.client = new RestHighLevelClient(
RestClient
.builder(
config.connectionUrls()
.stream()
.map(HttpHost::create)
.collect(toList())
.toArray(new HttpHost[config.connectionUrls().size()])
)
.setHttpClientConfigCallback(configCallbackHandler)
);
with
RestClient httpClient = RestClient.builder(
config.connectionUrls()
.stream()
.map(HttpHost::create)
.collect(toList())
.toArray(new HttpHost[config.connectionUrls().size()])
).setHttpClientConfigCallback(configCallbackHandler).build();
this.client = new RestHighLevelClientBuilder(httpClient).setApiCompatibilityMode(true).build();
But that didn't work. Any idea why ?
Connector Version: 13.0.0 Elasticsearch Version: 8.0.1
Getting the following stacktrace when running version 13.0.0 of the connector against Elastic 8.2.0 via docker-compose:
[2022-05-27 13:22:38,402] ERROR WorkerSinkTask{id=Elasticsearch_Sink_Connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed
at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:398)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onFailure(BulkRequestHandler.java:64)
at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
at org.elasticsearch.action.ActionListener$RunAfterActionListener.onFailure(ActionListener.java:350)
at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onFailure(Retry.java:123)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:175)
... 5 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to execute bulk request due to 'java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://elasticsearch:9200, response=HTTP/1.1 200 OK}' after 6 attempt(s)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:165)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:119)
at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:426)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:169)
... 5 more
Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://elasticsearch:9200, response=HTTP/1.1 200 OK}
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2190)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:620)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:171)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:158)
... 8 more
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:116)
at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:43)
at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:28)
at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:96)
at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:93)
at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:148)
at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:184)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2188)
@jimbethancourt This error is because of the compatibility issues.
HTTP Logs intercepted from ES Client node
pom.xml
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.17.4</version>
</dependency>
I also enabled compatibility mode by modifying RestHighLevelClientBuilder
GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549
{
"name" : "elk-client",
"cluster_name" : "es-cluster",
"cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
"version" : {
"number" : "8.0.1",
"build_flavor" : "default",
"build_type" : "deb",
"build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
"build_date" : "2022-02-24T13:55:40.601285296Z",
"build_snapshot" : false,
"lucene_version" : "9.0.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
HEAD / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549
GET / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549
{
"name" : "elk-client",
"cluster_name" : "es-cluster",
"cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
"version" : {
"number" : "8.0.1",
"build_flavor" : "default",
"build_type" : "deb",
"build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
"build_date" : "2022-02-24T13:55:40.601285296Z",
"build_snapshot" : false,
"lucene_version" : "9.0.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549
{
"name" : "elk-client",
"cluster_name" : "es-cluster",
"cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
"version" : {
"number" : "8.0.1",
"build_flavor" : "default",
"build_type" : "deb",
"build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
"build_date" : "2022-02-24T13:55:40.601285296Z",
"build_snapshot" : false,
"lucene_version" : "9.0.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
HEAD / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549
GET / HTTP/1.1
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549
{
"name" : "elk-client",
"cluster_name" : "es-cluster",
"cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
"version" : {
"number" : "8.0.1",
"build_flavor" : "default",
"build_type" : "deb",
"build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
"build_date" : "2022-02-24T13:55:40.601285296Z",
"build_snapshot" : false,
"lucene_version" : "9.0.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549
{
"name" : "elk-client",
"cluster_name" : "es-cluster",
"cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
"version" : {
"number" : "8.0.1",
"build_flavor" : "default",
"build_type" : "deb",
"build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
"build_date" : "2022-02-24T13:55:40.601285296Z",
"build_snapshot" : false,
"lucene_version" : "9.0.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
GET / HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 549
{
"name" : "elk-client",
"cluster_name" : "es-cluster",
"cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
"version" : {
"number" : "8.0.1",
"build_flavor" : "default",
"build_type" : "deb",
"build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
"build_date" : "2022-02-24T13:55:40.601285296Z",
"build_snapshot" : false,
"lucene_version" : "9.0.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
GET / HTTP/1.1
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 0
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 549
{
"name" : "elk-client",
"cluster_name" : "es-cluster",
"cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A",
"version" : {
"number" : "8.0.1",
"build_flavor" : "default",
"build_type" : "deb",
"build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372",
"build_date" : "2022-02-24T13:55:40.601285296Z",
"build_snapshot" : false,
"lucene_version" : "9.0.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
HEAD /test_index HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 659
POST /_bulk?timeout=1m HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 215616
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
...data....
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 65152
{"took":16,"ingest_took":3,"errors":false,....
POST /_bulk?timeout=1m HTTP/1.1
Content-Type: application/vnd.elasticsearch+json; compatible-with=7
Accept: application/vnd.elasticsearch+json; compatible-with=7
X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13
Content-Length: 215616
Host: 172.16.216.201:33333
Connection: Keep-Alive
User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15)
...data...
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/vnd.elasticsearch+json;compatible-with=7
content-length: 65152
Based on the logs, it is clear that connector is sending compatibility headers in its requests and server is responding based on the headers.
Still connector fails throwing same error Unable to parse response body for Response
@kpatelatwork @snehashisp
HTTP Logs intercepted from ES Client node
pom.xml
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.17.4</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.17.4</version> </dependency>
I also enabled compatibility mode by modifying RestHighLevelClientBuilder
GET / HTTP/1.1 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Content-Length: 0 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/json content-length: 549 { "name" : "elk-client", "cluster_name" : "es-cluster", "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A", "version" : { "number" : "8.0.1", "build_flavor" : "default", "build_type" : "deb", "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372", "build_date" : "2022-02-24T13:55:40.601285296Z", "build_snapshot" : false, "lucene_version" : "9.0.0", "minimum_wire_compatibility_version" : "7.17.0", "minimum_index_compatibility_version" : "7.0.0" }, "tagline" : "You Know, for Search" } HEAD / HTTP/1.1 Accept: application/vnd.elasticsearch+json; compatible-with=7 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/vnd.elasticsearch+json;compatible-with=7 content-length: 549 GET / HTTP/1.1 Accept: application/vnd.elasticsearch+json; compatible-with=7 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Content-Length: 0 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/vnd.elasticsearch+json;compatible-with=7 content-length: 549 { "name" : "elk-client", "cluster_name" : "es-cluster", "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A", "version" : { "number" : "8.0.1", "build_flavor" : "default", "build_type" : "deb", "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372", "build_date" : "2022-02-24T13:55:40.601285296Z", "build_snapshot" : false, "lucene_version" : "9.0.0", "minimum_wire_compatibility_version" : "7.17.0", "minimum_index_compatibility_version" : "7.0.0" }, "tagline" : "You Know, for Search" } GET / HTTP/1.1 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Content-Length: 0 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/json content-length: 549 { "name" : "elk-client", "cluster_name" : "es-cluster", "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A", "version" : { "number" : "8.0.1", "build_flavor" : "default", "build_type" : "deb", "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372", "build_date" : "2022-02-24T13:55:40.601285296Z", "build_snapshot" : false, "lucene_version" : "9.0.0", "minimum_wire_compatibility_version" : "7.17.0", "minimum_index_compatibility_version" : "7.0.0" }, "tagline" : "You Know, for Search" } HEAD / HTTP/1.1 Accept: application/vnd.elasticsearch+json; compatible-with=7 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/vnd.elasticsearch+json;compatible-with=7 content-length: 549 GET / HTTP/1.1 Accept: application/vnd.elasticsearch+json; compatible-with=7 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Content-Length: 0 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/vnd.elasticsearch+json;compatible-with=7 content-length: 549 { "name" : "elk-client", "cluster_name" : "es-cluster", "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A", "version" : { "number" : "8.0.1", "build_flavor" : "default", "build_type" : "deb", "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372", "build_date" : "2022-02-24T13:55:40.601285296Z", "build_snapshot" : false, "lucene_version" : "9.0.0", "minimum_wire_compatibility_version" : "7.17.0", "minimum_index_compatibility_version" : "7.0.0" }, "tagline" : "You Know, for Search" } GET / HTTP/1.1 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Content-Length: 0 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/json content-length: 549 { "name" : "elk-client", "cluster_name" : "es-cluster", "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A", "version" : { "number" : "8.0.1", "build_flavor" : "default", "build_type" : "deb", "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372", "build_date" : "2022-02-24T13:55:40.601285296Z", "build_snapshot" : false, "lucene_version" : "9.0.0", "minimum_wire_compatibility_version" : "7.17.0", "minimum_index_compatibility_version" : "7.0.0" }, "tagline" : "You Know, for Search" } GET / HTTP/1.1 Content-Type: application/vnd.elasticsearch+json; compatible-with=7 Accept: application/vnd.elasticsearch+json; compatible-with=7 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Content-Length: 0 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/vnd.elasticsearch+json;compatible-with=7 content-length: 549 { "name" : "elk-client", "cluster_name" : "es-cluster", "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A", "version" : { "number" : "8.0.1", "build_flavor" : "default", "build_type" : "deb", "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372", "build_date" : "2022-02-24T13:55:40.601285296Z", "build_snapshot" : false, "lucene_version" : "9.0.0", "minimum_wire_compatibility_version" : "7.17.0", "minimum_index_compatibility_version" : "7.0.0" }, "tagline" : "You Know, for Search" } GET / HTTP/1.1 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Content-Length: 0 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/json content-length: 549 { "name" : "elk-client", "cluster_name" : "es-cluster", "cluster_uuid" : "YtCh8qMoTC68BJkld-yh_A", "version" : { "number" : "8.0.1", "build_flavor" : "default", "build_type" : "deb", "build_hash" : "801d9ccc7c2ee0f2cb121bbe22ab5af77a902372", "build_date" : "2022-02-24T13:55:40.601285296Z", "build_snapshot" : false, "lucene_version" : "9.0.0", "minimum_wire_compatibility_version" : "7.17.0", "minimum_index_compatibility_version" : "7.0.0" }, "tagline" : "You Know, for Search" } HEAD /test_index HTTP/1.1 Content-Type: application/vnd.elasticsearch+json; compatible-with=7 Accept: application/vnd.elasticsearch+json; compatible-with=7 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/vnd.elasticsearch+json;compatible-with=7 content-length: 659 POST /_bulk?timeout=1m HTTP/1.1 Content-Type: application/vnd.elasticsearch+json; compatible-with=7 Accept: application/vnd.elasticsearch+json; compatible-with=7 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Content-Length: 215616 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) ...data.... HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/vnd.elasticsearch+json;compatible-with=7 content-length: 65152 {"took":16,"ingest_took":3,"errors":false,.... POST /_bulk?timeout=1m HTTP/1.1 Content-Type: application/vnd.elasticsearch+json; compatible-with=7 Accept: application/vnd.elasticsearch+json; compatible-with=7 X-Elastic-Client-Meta: es=7.17.4p,jv=11,t=7.17.4p,hc=4.1.4,kt=1.4,sc=2.13 Content-Length: 215616 Host: 172.16.216.201:33333 Connection: Keep-Alive User-Agent: elasticsearch-java/7.17.4-SNAPSHOT (Java/11.0.15) ...data... HTTP/1.1 200 OK X-elastic-product: Elasticsearch content-type: application/vnd.elasticsearch+json;compatible-with=7 content-length: 65152
Based on the logs, it is clear that connector is sending compatibility headers in its requests and server is responding based on the headers.
Still connector fails throwing same error
Unable to parse response body for Response
@foboni Thanks for reporting this, we are discussing internally how to prioritize 8.0 support.
copying @rajat-agarwal
According to the docs,
The HLRC version 7.17 can be used with Elasticsearch version 8.x by enabling HLRC’s compatibility mode (see code sample below). In this mode HLRC sends additional headers that instruct Elasticsearch 8.x to behave like a 7.x server.
I'm using 7.17 and have enabled compatibility mode. Even then it fails throwing the same error Unable to parse response body for Response
.
@kpatelatwork Will there be any workaround for this before we migrate to new Java Client API ?
Also do we have any docs available with instructions on how to build this connector from source ? Currently I'm not able to run mvn clean install
as it fails in several test cases. But mvn clean package
is passing the build.
@foboni we have asked @snehashisp to evaluate this and we will get back to you as we know more.
@foboni I was able to get this to work with es client version 7.17.3 in compatibility mode, to es 8.1.1 running locally. The es client was running without any security but that should not matter for functional compatibility. The changes are the same as what you had made to the client but you can pull the es-8
branch and mvn clean package
for a quick build.
Thanks @snehashisp
It worked with 8.1.1 But not sure why it didn't work with 8.0.1
@snehashisp what is the anticipated timeline for merging changes from the es-8
branch into master
? Thanks!
I got the following error when invoke the mvn clean package command for es-8 branch. Please help me to fix this issues
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check (validate) on project kafka-connect-elasticsearch: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3: Failed to read artifact descriptor for io.confluent:build-tools:jar:6.2.3: Could not transfer artifact io.confluent:build-tools:pom:6.2.3 from/to maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: [confluent (http://packages.confluent.io/maven/, default, releases+snapshots)] -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check (validate) on project kafka-connect-elasticsearch: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3 at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:306) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:211) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:165) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:157) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:121) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:127) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192) at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105) at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960) at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293) at org.apache.maven.cli.MavenCli.main (MavenCli.java:196) at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:498) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282) at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406) at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347) Caused by: org.apache.maven.plugin.PluginExecutionException: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3 at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:109)
Hi guys, do you have an update?
Hi guys, do you have an update?
ES 8.x support should be available sometime late in Q4.
There is an pull request opening for this deal.
I got the following error when invoke the mvn clean package command for es-8 branch. Please help me to fix this issues
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check (validate) on project kafka-connect-elasticsearch: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3: Failed to read artifact descriptor for io.confluent:build-tools:jar:6.2.3: Could not transfer artifact io.confluent:build-tools:pom:6.2.3 from/to maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: [confluent (http://packages.confluent.io/maven/, default, releases+snapshots)] -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check (validate) on project kafka-connect-elasticsearch: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3 at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:306) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:211) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:165) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:157) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:121) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:127) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192) at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105) at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960) at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293) at org.apache.maven.cli.MavenCli.main (MavenCli.java:196) at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:498) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282) at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406) at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347) Caused by: org.apache.maven.plugin.PluginExecutionException: Execution validate of goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1:check failed: Plugin org.apache.maven.plugins:maven-checkstyle-plugin:3.1.1 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.maven.plugins:maven-checkstyle-plugin:jar:3.1.1 -> io.confluent:build-tools:jar:6.2.3 at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:109)
Try to build with maven:3.6-openjdk-8
kafka-connect-elasticsearch git:(es-8) ✗ docker run --rm -v "$(pwd)":/opt/maven -e "-Dmaven.repo.local=/opt/maven/.m2/repository" -w /opt/maven maven:3.6-openjdk-8 mvn clean package -Dmaven.repo.local=/opt/maven/.m2/repository
It works with ES 8.3.2
Hi guys, do you have an update?
ES 8.x support should be available sometime late in Q4.
There is an pull request opening for this deal.
I see that the pull request has been merged a couple of weeks ago. Is there any ETA for the official version to be released?
Still broken for me.
[2023-12-19 20:01:06,561] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed
at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:443)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onFailure(BulkRequestHandler.java:64)
at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
at org.elasticsearch.action.ActionListener$RunAfterActionListener.onFailure(ActionListener.java:350)
at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onFailure(Retry.java:123)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:216)
... 5 more
ES 8.32 Kafka-connect-elasticsearch v14.0.12
This line requires the user has the "monitoring_user" role or equivalent added to your es connector user.
curl --cacert /tmp/ca.crt -u elastic:kb4gtb4NaAjbYsnmwwBK -X POST "https://localhost:9200/_security/user/es_sink_connector_user?pretty" -H 'Content-Type: application/json' -d'
{
"password" : "c6HHxnucdmvUHUTEKb7q",
"roles" : [ "es_sink_connector_role", "monitoring_user" ]
}'
I think the documentation needs updating accordingly for 8.x users
Nice find @suityou01 - this worked for me as well. I looked around but looks like we can't PR the docs. Other code snippet for context.