elasticsearch-java
elasticsearch-java copied to clipboard
Is there a replacement for the BulkProcessor?
With RHLC, one could use the BulkProcessor API to batch IndexRequests and DeleteRequests. Is there a recommended replacement for BulkProcessor when migrating to elasticsearch-java?
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html
There's no equivalent for now, but we're working on a replacement that will land in the near future.
Is there any update on this feature?
Any update on BulkProcessors for the new ES java api?
if there is no equivalent for BulkProcessor in the new elasticsearch java api client, how will we migrate a BulkProcessor code? Are the new elasticsearch java api client BulkRequest, BulkOperation, BulkResponse object enough?
Is there a sample just to point to the right direction?
any update would be much appreciated @swallez
Sorry, no update yet. I'll raise this internally as a topic that needs to be prioritized.
Hi folks - thanks for the already open issue.
We've recently migrated from the now deprecated ES7 client to the new API Client in v8 and "stumbled" the very hard way on bulk requests. ElasticSearch rejects our bulk requests due to max_content_length exceeded - which is plausible, as the bulk() client is now not doing any batching.
Instead of having the "old" way (as well as the way of the clients in, e.g., Python), where the ES client is splitting the bulk request in smaller chunks based on (byte)size or document count, we now have to reside to manually splitting the data ourselves. Seems a step backward to me.
Is there a way to speed up the development of a "new" BulkProcessor, aside from "implement it yourselves"? ("buy Enterprise support", "send us chocolate", "talk to and bribe a member of the Evangelist team", ...)
Thanks! :)
Best, Anton
Do we have any deadline on update of this issue ? Any documentation coming soon ?
Any reason why this is taking so long? Why even introduce a new API without this core component for ingestion? This issue should be at the top of the list of things to solve in my opinion. An update and preferably timeline would be greatly appreciated.
Thanks and best regards, Frank
It seems that I just opened a duplicate to that here: https://github.com/elastic/elasticsearch-java/issues/425 I also asked for a solution in the official board: https://discuss.elastic.co/t/new-java-client-how-to-estimate-size-of-bulkrequest/316211
It hard to beliefe that you should migrate to the new client before switching from es 7 to es 8, when there is no such feature present.
Well I started migrating to the new java api client and underway realised the bulkprocessor was not available there. It has been almost a year now and still nothing seems to be moving on the Elastic front. I guess only paying customers get support these days...
I don't understand that this issue is given zero priority. Why introduce a new API if that is not usable in real life situations for bulk processing? Any update? Any deadline?
I stumbled upon this issue yesterday, something like that seems to be working (but I have yet to do extensive testing):
In classpath:
- co.elastic.clients:elasticsearch-java:8.5.0
- org.elasticsearch.client:elasticsearch-rest-high-level-client:elasticsearch-rest:7.17.3
Then in my indexing logic that relies on BulkProcessor
I have:
if (esVersion.getMajor() >= 8) {
requestOptions = RequestOptions.DEFAULT
.toBuilder()
.addHeader("Content-Type", "application/vnd.elasticsearch+json; compatible-with=7")
.addHeader("Accept", "application/vnd.elasticsearch+json; compatible-with=7")
.build();
} else {
requestOptions = RequestOptions.DEFAULT;
}
That I'm using it like this:
Request request = new Request(HttpPost.METHOD_NAME, "/_bulk");
request.setOptions(requestOptions);
With that I do not have NPE anymore and the indexing seems to work properly on es6/es7/es8.
Hopefully I'll not have classpath issues due to having 2 versions of the java client.
Hi Anthony,
It's not that there is no bulk processor functionality it's the data chunking and the listener that are missing. We used to be able to do things like:
// Create bulk processor.
BulkProcessor.Listener bulkProcessorListener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest bulkRequest) {
// Handle stuff before bulk execution...
}
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
// Handle item responses...
}
if (bulkResponse.hasFailures()) {
logger.error(bulkResponse.buildFailureMessage());
}
}
@SuppressWarnings("rawtypes")
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
List<DocWriteRequest<?>> docWriteRequests = bulkRequest.requests();
for (DocWriteRequest docWriteRequest : docWriteRequests) {
// Handle failures...
}
}
};
BulkProcessor bulkProcessor = BulkProcessor.builder((request, bulkListener) -> esHighLevelClient.
bulkAsync(request, RequestOptions.DEFAULT, bulkListener), bulkProcessorListener)
// Only make use of the byte size to force a flush.
.setBulkActions(-1)
.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
.setConcurrentRequests(0)
.build();
...
Below is some code where I actually use the new bulk processor. The events used in the code are documents that need to be stored, updated,... I keep track of the ElasticSearch index in the event itself because updates could go horribly wrong otherwise when there is an index rollover (not everyone purely uses ElasticSearch for a logging use case:). We use ElasticSearch also as our database for master data unlike most other users. I've been using ElasticSearch since version 0.1.4... Hopefully it is of some help for you.
Best regards, Frank Montyne
// Create bulk processor.
BulkRequest.Builder bulkRequestBuilder = new BulkRequest.Builder()
.waitForActiveShards(asBuilder -> asBuilder.count(1))
.refresh(Refresh.True);
// Needed as a temporary workaround until withJson() bug is fixed!
JsonpMapper jsonpMapper = esClient._transport().jsonpMapper();
JsonProvider jsonProvider = jsonpMapper.jsonProvider();
try {
// Add events to bulk processor.
List<BulkOperation> bulkOperations = new ArrayList<>();
// Add events to bulk processor.
for (E event : indexSetEvents) {
// Add request to bulk processor.
switch (event.action()) {
case create:
bulkOperations.add(new CreateOperation.Builder<JsonData>()
.index(event.esIndex())
.id(event.id())
// Temporary workaround until withJson() bug is fixed!
.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
//.withJson(event.toESJsonReader())
.build()
._toBulkOperation());
break;
case update:
bulkOperations.add(new IndexOperation.Builder<JsonData>()
.index(event.esIndex())
.id(event.id())
// Temporary workaround until withJson() bug is fixed!
.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
//.withJson(event.toESJsonReader())
.build()
._toBulkOperation());
break;
case delete:
// Soft delete event.
event.deleted(true);
bulkOperations.add(new IndexOperation.Builder<JsonData>()
.index(event.esIndex())
.id(event.id())
// Temporary workaround until withJson() bug is fixed!
.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
//.withJson(event.toESJsonReader())
.build()
._toBulkOperation());
break;
case undelete:
// Soft undelete event.
event.deleted(false);
bulkOperations.add(new IndexOperation.Builder<JsonData>()
.index(event.esIndex())
.id(event.id())
// Temporary workaround until withJson() bug is fixed!
.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
//.withJson(event.toESJsonReader())
.build()
._toBulkOperation());
break;
case purge:
// Real physical delete.
bulkOperations.add(new DeleteOperation.Builder()
.index(event.esIndex())
.id(event.id())
.build()
._toBulkOperation());
break;
default:
// Should not get here. Log anyway.
logger.error(String.format("Skipped event with unsupported action '%s' -> %s", event.action().name(), event.toJson()));
break;
}
}
bulkRequestBuilder.operations(bulkOperations);
// Perform bulk request.
BulkResponse bulkResponse = esClient.bulk(bulkRequestBuilder.build());
for (BulkResponseItem bulkResponseItem : bulkResponse.items()) {
// Bulk request API not fully fleshed out yet. There is no way to get to the source.
// The bulkResponseItem.get().source() fails since bulkResponseItem.get() returns null!
// Process response item...
}
}
@frank-montyne In this case I cannot help you.
I still have the listener because I'm still relying on the 7.X
client with ES8 in compatibility mode.
What you are using here is just the Bulk API of ES and it's far (like very far) from what BulkProcessor
actually does.
With BulkProcessor
you can configure:
- the concurrency
- the max size of bulk requests (by document count and/or by size)
- the back-off policy
You can then "stream" as many documents as you want through it and it will:
- batch documents in bulk using configured thresholds
- handles N requests in parallel (N being the concurrency given)
- the request being the call
.bulk
on the ES client - what you are doing in your snippet
- the request being the call
- handles retries of failed document within that bulk request following the back-off policy
Exactly my point.
On Thu, Nov 17, 2022 at 11:42 AM Anthony Pessy @.***> wrote:
@frank-montyne https://github.com/frank-montyne In this case I cannot help you.
I still have the listener because I'm still relying on the 7.X client with ES8 in compatibility mode.
What you are using here is just the Bulk API of ES and it's far (like very far) from what BulkProcessor actually does.
With BulkProcessor you can configure:
- the concurrency
- the max size of bulk requests (by document count and/or
- by size)
- the back-off policy
You can then "stream" as many documents as you want through it and it will:
- batch documents in bulk using configured thresholds
- handles N requests in parallel (N being the concurrency given)
- the request being the call .bulk on the ES client - what you are doing in your snippet
- handles retries of failed document within that bulk request following the back-off policy
— Reply to this email directly, view it on GitHub https://github.com/elastic/elasticsearch-java/issues/108#issuecomment-1318439237, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACJKQL5QMXD4B2YGTCZOAL3WIYDZ3ANCNFSM5L5NIE3Q . You are receiving this because you were mentioned.Message ID: @.***>
Is adding a new Bulk Processor in 8+ java client jars being worked on? My application relies on the BulkProcessor's functionalities and trying to code my own Bulk Processor seems like a big risk not worth taking. I would rather wait for ElasticSearch dev team to come up with a new one. For now I will continue to use 7.17.3 jars
That is the point I am interested in as well (see https://github.com/elastic/elasticsearch-java/issues/425).
I would be glad to know the recommend way to write large data into elasticsearch. You can not know the size of your request by only inspecting your POJOs. So there must be something in this library right?
Maybe @swallez can clear things up?
@swallez It does not look like the new BulkIngester retry a failed operation like BulkProcessor did, am I reading it correctly? It does not seems to look at the actual response at all.
Previously the BulkProcessor would create a new BulkRequest with only the failed operations for the next batch.
@panthony the new BulkIngester
indeed doesn't have retries for now. There are a few shortcomings in the way BulkProcessor
handles retries that would need to be cleared out before adding retries to BulkIngester
.
I've opened #478 to outline the issues and a way to implement this. Please continue the discussion on retries there.
@swallez The BulkIngester helper doesn't seem to be present in the 7.17.8 release. Is that correct? If so to which 7.17.x release will it be added?
Thanks Frank
@frank-montyne that's correct. It will be included in 7.17.9 which should be released at the end of this month, and in 8.7.0 that is currently planned somewhere in March.
ThanksOn 16 Jan 2023, at 18:33, Sylvain Wallez @.***> wrote: @frank-montyne that's correct. It will be included in 7.17.9 which should be released at the end of this month, and in 8.7.0 that is currently planned somewhere in March.
—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you were mentioned.Message ID: @.***>
I don't know if it can help someone but I created a class to mimic bulkprocessor. I did not benchmark nor tested extensively it.
Function sending the data to an insertion thread:
public void send(Object record) {
try {
queue[currentInsertionThread].put(record);
currentInsertionThread = (currentInsertionThread + 1) % bulkConcurrentRequests;
} catch (InterruptedException e) {
LOG.error("Error while sending records to Elasticsearch: " + e.getMessage());
}
}
Class managing insertions:
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
public class ElasticsearchInsertionThread implements Runnable {
private static final Logger LOG = LogManager.getLogger(ElasticsearchInsertionThread.class);
// List containing bulk operations
List<BulkOperation> listBulkOperation = new ArrayList<>();
// Maximum number of actions per bulk
int bulkMaxActions;
// Index name
String indexName;
// Elasticsearch client
ElasticsearchClient client;
// Queue used to share records with main thread
private BlockingQueue<Object> queue;
// State of thread
boolean running;
public ElasticsearchInsertionThread(ElasticsearchClient client, BlockingQueue<Object> queue) {
this.client = client;
this.queue = queue;
this.indexName = System.getenv().get("es_index_name");
this.bulkMaxActions = Integer.valueOf(System.getenv().get("es_bulk_max_actions"));
this.running = false;
}
/**
* Get state of thread
* @return true if the thread is running, otherwise false
*/
public boolean isRunning() {
return running;
}
@Override
public void run() {
running = true;
try {
Object record;
while ((record = queue.take()) != null) {
@SuppressWarnings("unchecked")
Map<String, Object> esValues = (Map<String, Object>)record;
if (esValues.isEmpty()) {
// empty value to exit the thread
// Flush documents not inserted before leaving
if (listBulkOperation.size() > 0) {
insertIntoElasticsearch();
}
break;
}
// Create bulk operation
listBulkOperation.add(BulkOperation.of(_0 -> _0
.create(_1 -> _1
.index(indexName)
.id(docId)
.document(esValues))));
if (listBulkOperation.size() == bulkMaxActions) {
// Send the bulk operations once maximum size has been reached
insertIntoElasticsearch();
}
}
} catch (InterruptedException e) {
LOG.error("Error getting data from queue " + e.getMessage());
}
running = false;
}
/**
* Insert documents in the bulk into Elasticsearch
*/
void insertIntoElasticsearch() {
try {
BulkResponse response = client.bulk(_0 -> _0
.operations(listBulkOperation));
if (response.errors()) {
LOG.error("Bulk Response has failures");
}
} catch (IOException | ElasticsearchException e) {
LOG.error("Error while sending records to Elasticsearch: " + e.getMessage());
}
listBulkOperation.clear();
}
}
@nicolasm35 as mentioned previously, an implementation will be part of the next release. See PR #474 and https://github.com/elastic/elasticsearch-java/tree/main/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk
If you can't wait for the next release, I suggest you copy that code instead of using this more limited implementation.