elasticsearch-java icon indicating copy to clipboard operation
elasticsearch-java copied to clipboard

Is there a replacement for the BulkProcessor?

Open tony-hizzle opened this issue 2 years ago • 11 comments

With RHLC, one could use the BulkProcessor API to batch IndexRequests and DeleteRequests. Is there a recommended replacement for BulkProcessor when migrating to elasticsearch-java?

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html

tony-hizzle avatar Jan 14 '22 01:01 tony-hizzle

There's no equivalent for now, but we're working on a replacement that will land in the near future.

swallez avatar Jan 14 '22 10:01 swallez

Is there any update on this feature?

jerryguowei avatar Mar 16 '22 22:03 jerryguowei

Any update on BulkProcessors for the new ES java api?

jetpack1116 avatar Jul 15 '22 03:07 jetpack1116

if there is no equivalent for BulkProcessor in the new elasticsearch java api client, how will we migrate a BulkProcessor code? Are the new elasticsearch java api client BulkRequest, BulkOperation, BulkResponse object enough?

Is there a sample just to point to the right direction?

jetpack1116 avatar Jul 25 '22 05:07 jetpack1116

any update would be much appreciated @swallez

divadpoc avatar Aug 04 '22 10:08 divadpoc

Sorry, no update yet. I'll raise this internally as a topic that needs to be prioritized.

swallez avatar Aug 04 '22 15:08 swallez

Hi folks - thanks for the already open issue.

We've recently migrated from the now deprecated ES7 client to the new API Client in v8 and "stumbled" the very hard way on bulk requests. ElasticSearch rejects our bulk requests due to max_content_length exceeded - which is plausible, as the bulk() client is now not doing any batching.

Instead of having the "old" way (as well as the way of the clients in, e.g., Python), where the ES client is splitting the bulk request in smaller chunks based on (byte)size or document count, we now have to reside to manually splitting the data ourselves. Seems a step backward to me.

Is there a way to speed up the development of a "new" BulkProcessor, aside from "implement it yourselves"? ("buy Enterprise support", "send us chocolate", "talk to and bribe a member of the Evangelist team", ...)

Thanks! :)

Best, Anton

antondollmaier avatar Aug 08 '22 08:08 antondollmaier

Do we have any deadline on update of this issue ? Any documentation coming soon ?

sat245 avatar Aug 11 '22 11:08 sat245

Any reason why this is taking so long? Why even introduce a new API without this core component for ingestion? This issue should be at the top of the list of things to solve in my opinion. An update and preferably timeline would be greatly appreciated.

Thanks and best regards, Frank

frank-montyne avatar Oct 17 '22 10:10 frank-montyne

It seems that I just opened a duplicate to that here: https://github.com/elastic/elasticsearch-java/issues/425 I also asked for a solution in the official board: https://discuss.elastic.co/t/new-java-client-how-to-estimate-size-of-bulkrequest/316211

It hard to beliefe that you should migrate to the new client before switching from es 7 to es 8, when there is no such feature present.

fabian-froehlich avatar Oct 19 '22 13:10 fabian-froehlich

Well I started migrating to the new java api client and underway realised the bulkprocessor was not available there. It has been almost a year now and still nothing seems to be moving on the Elastic front. I guess only paying customers get support these days...

frank-montyne avatar Oct 19 '22 15:10 frank-montyne

I don't understand that this issue is given zero priority. Why introduce a new API if that is not usable in real life situations for bulk processing? Any update? Any deadline?

frank-montyne avatar Nov 16 '22 15:11 frank-montyne

I stumbled upon this issue yesterday, something like that seems to be working (but I have yet to do extensive testing):

In classpath:

  • co.elastic.clients:elasticsearch-java:8.5.0
  • org.elasticsearch.client:elasticsearch-rest-high-level-client:elasticsearch-rest:7.17.3

Then in my indexing logic that relies on BulkProcessor I have:

        if (esVersion.getMajor() >= 8) {
            requestOptions = RequestOptions.DEFAULT
                .toBuilder()
                .addHeader("Content-Type", "application/vnd.elasticsearch+json; compatible-with=7")
                .addHeader("Accept", "application/vnd.elasticsearch+json; compatible-with=7")
                .build();
        } else {
            requestOptions = RequestOptions.DEFAULT;
        }

That I'm using it like this:

        Request request = new Request(HttpPost.METHOD_NAME, "/_bulk");
        request.setOptions(requestOptions);

With that I do not have NPE anymore and the indexing seems to work properly on es6/es7/es8.

Hopefully I'll not have classpath issues due to having 2 versions of the java client.

panthony avatar Nov 16 '22 16:11 panthony

Hi Anthony,

It's not that there is no bulk processor functionality it's the data chunking and the listener that are missing. We used to be able to do things like:

// Create bulk processor.
BulkProcessor.Listener bulkProcessorListener = new BulkProcessor.Listener() {
	@Override
	public void beforeBulk(long executionId, BulkRequest bulkRequest) {
		// Handle stuff before bulk execution...
	}

	@Override
	public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
		for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
			// Handle item responses...
		}

		if (bulkResponse.hasFailures()) {
			logger.error(bulkResponse.buildFailureMessage());
		}
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
		List<DocWriteRequest<?>> docWriteRequests = bulkRequest.requests();
		for (DocWriteRequest docWriteRequest : docWriteRequests) {
			// Handle failures...
		}
	}
};

BulkProcessor bulkProcessor = BulkProcessor.builder((request, bulkListener) -> esHighLevelClient.
	bulkAsync(request, RequestOptions.DEFAULT, bulkListener), bulkProcessorListener)
	// Only make use of the byte size to force a flush.
	.setBulkActions(-1)
	.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
	.setConcurrentRequests(0)
	.build(); 
...

Below is some code where I actually use the new bulk processor. The events used in the code are documents that need to be stored, updated,... I keep track of the ElasticSearch index in the event itself because updates could go horribly wrong otherwise when there is an index rollover (not everyone purely uses ElasticSearch for a logging use case:). We use ElasticSearch also as our database for master data unlike most other users. I've been using ElasticSearch since version 0.1.4... Hopefully it is of some help for you.

Best regards, Frank Montyne

// Create bulk processor.
BulkRequest.Builder bulkRequestBuilder = new BulkRequest.Builder()
		.waitForActiveShards(asBuilder -> asBuilder.count(1))
		.refresh(Refresh.True);

// Needed as a temporary workaround until withJson() bug is fixed! 
JsonpMapper jsonpMapper = esClient._transport().jsonpMapper();
JsonProvider jsonProvider = jsonpMapper.jsonProvider();

try {
	// Add events to bulk processor.
	List<BulkOperation> bulkOperations = new ArrayList<>();

	// Add events to bulk processor.
	for (E event : indexSetEvents) {
		// Add request to bulk processor.
		switch (event.action()) {
			case create:
				bulkOperations.add(new CreateOperation.Builder<JsonData>()
						.index(event.esIndex())
						.id(event.id())
						// Temporary workaround until withJson() bug is fixed! 
						.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
						//.withJson(event.toESJsonReader())
						.build()
						._toBulkOperation());					
				break;

			case update:
				bulkOperations.add(new IndexOperation.Builder<JsonData>()
						.index(event.esIndex())
						.id(event.id())
						// Temporary workaround until withJson() bug is fixed! 
						.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
						//.withJson(event.toESJsonReader())
						.build()
						._toBulkOperation());					
				break;

			case delete:
				// Soft delete event.
				event.deleted(true);
				bulkOperations.add(new IndexOperation.Builder<JsonData>()
						.index(event.esIndex())
						.id(event.id())
						// Temporary workaround until withJson() bug is fixed! 
						.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
						//.withJson(event.toESJsonReader())
						.build()
						._toBulkOperation());					
				break;

			case undelete:
				// Soft undelete event.
				event.deleted(false);
				bulkOperations.add(new IndexOperation.Builder<JsonData>()
						.index(event.esIndex())
						.id(event.id())
						// Temporary workaround until withJson() bug is fixed! 
						.document(JsonData.from(jsonProvider.createParser(event.toESJsonReader()), jsonpMapper))
						//.withJson(event.toESJsonReader())
						.build()
						._toBulkOperation());					
				break;

				case purge:
				// Real physical delete.
				bulkOperations.add(new DeleteOperation.Builder()
						.index(event.esIndex())
						.id(event.id())
						.build()
						._toBulkOperation());					
				break;

			default:
				// Should not get here. Log anyway.
				logger.error(String.format("Skipped event with unsupported action '%s' -> %s", event.action().name(), event.toJson()));
				break;
		}
	}

	bulkRequestBuilder.operations(bulkOperations);
	
	// Perform bulk request.
	BulkResponse bulkResponse = esClient.bulk(bulkRequestBuilder.build());
	for (BulkResponseItem bulkResponseItem : bulkResponse.items()) {
		// Bulk request API not fully fleshed out yet. There is no way to get to the source.
		// The bulkResponseItem.get().source() fails since bulkResponseItem.get() returns null!
		// Process response item...
	}
}

frank-montyne avatar Nov 17 '22 10:11 frank-montyne

@frank-montyne In this case I cannot help you.

I still have the listener because I'm still relying on the 7.X client with ES8 in compatibility mode.

What you are using here is just the Bulk API of ES and it's far (like very far) from what BulkProcessor actually does.

With BulkProcessor you can configure:

  • the concurrency
  • the max size of bulk requests (by document count and/or by size)
  • the back-off policy

You can then "stream" as many documents as you want through it and it will:

  • batch documents in bulk using configured thresholds
  • handles N requests in parallel (N being the concurrency given)
    • the request being the call .bulk on the ES client - what you are doing in your snippet
  • handles retries of failed document within that bulk request following the back-off policy

panthony avatar Nov 17 '22 10:11 panthony

Exactly my point.

On Thu, Nov 17, 2022 at 11:42 AM Anthony Pessy @.***> wrote:

@frank-montyne https://github.com/frank-montyne In this case I cannot help you.

I still have the listener because I'm still relying on the 7.X client with ES8 in compatibility mode.

What you are using here is just the Bulk API of ES and it's far (like very far) from what BulkProcessor actually does.

With BulkProcessor you can configure:

  • the concurrency
  • the max size of bulk requests (by document count and/or
  • by size)
  • the back-off policy

You can then "stream" as many documents as you want through it and it will:

  • batch documents in bulk using configured thresholds
  • handles N requests in parallel (N being the concurrency given)
    • the request being the call .bulk on the ES client - what you are doing in your snippet
  • handles retries of failed document within that bulk request following the back-off policy

— Reply to this email directly, view it on GitHub https://github.com/elastic/elasticsearch-java/issues/108#issuecomment-1318439237, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACJKQL5QMXD4B2YGTCZOAL3WIYDZ3ANCNFSM5L5NIE3Q . You are receiving this because you were mentioned.Message ID: @.***>

frank-montyne avatar Nov 17 '22 12:11 frank-montyne

Is adding a new Bulk Processor in 8+ java client jars being worked on? My application relies on the BulkProcessor's functionalities and trying to code my own Bulk Processor seems like a big risk not worth taking. I would rather wait for ElasticSearch dev team to come up with a new one. For now I will continue to use 7.17.3 jars

jcr0ss avatar Dec 07 '22 14:12 jcr0ss

That is the point I am interested in as well (see https://github.com/elastic/elasticsearch-java/issues/425).

I would be glad to know the recommend way to write large data into elasticsearch. You can not know the size of your request by only inspecting your POJOs. So there must be something in this library right?

Maybe @swallez can clear things up?

fabian-froehlich avatar Dec 13 '22 08:12 fabian-froehlich

@swallez It does not look like the new BulkIngester retry a failed operation like BulkProcessor did, am I reading it correctly? It does not seems to look at the actual response at all.

Previously the BulkProcessor would create a new BulkRequest with only the failed operations for the next batch.

panthony avatar Jan 04 '23 13:01 panthony

@panthony the new BulkIngester indeed doesn't have retries for now. There are a few shortcomings in the way BulkProcessor handles retries that would need to be cleared out before adding retries to BulkIngester.

I've opened #478 to outline the issues and a way to implement this. Please continue the discussion on retries there.

swallez avatar Jan 04 '23 15:01 swallez

@swallez The BulkIngester helper doesn't seem to be present in the 7.17.8 release. Is that correct? If so to which 7.17.x release will it be added?

Thanks Frank

frank-montyne avatar Jan 16 '23 16:01 frank-montyne

@frank-montyne that's correct. It will be included in 7.17.9 which should be released at the end of this month, and in 8.7.0 that is currently planned somewhere in March.

swallez avatar Jan 16 '23 17:01 swallez

ThanksOn 16 Jan 2023, at 18:33, Sylvain Wallez @.***> wrote: @frank-montyne that's correct. It will be included in 7.17.9 which should be released at the end of this month, and in 8.7.0 that is currently planned somewhere in March.

—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you were mentioned.Message ID: @.***>

frank-montyne avatar Jan 16 '23 20:01 frank-montyne

I don't know if it can help someone but I created a class to mimic bulkprocessor. I did not benchmark nor tested extensively it.

Function sending the data to an insertion thread:

    public void send(Object record) {
    	try {
			queue[currentInsertionThread].put(record);
			currentInsertionThread = (currentInsertionThread + 1) % bulkConcurrentRequests;
		} catch (InterruptedException e) {
			LOG.error("Error while sending records to Elasticsearch: " + e.getMessage());
		}
    }

Class managing insertions:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;

public class ElasticsearchInsertionThread implements Runnable {

	private static final Logger LOG = LogManager.getLogger(ElasticsearchInsertionThread.class);
	
	// List containing bulk operations
	List<BulkOperation> listBulkOperation = new ArrayList<>();
		
	// Maximum number of actions per bulk 
	int bulkMaxActions;
	
	// Index name
	String indexName;
	
	// Elasticsearch client
	ElasticsearchClient client;
	
	// Queue used to share records with main thread
	private BlockingQueue<Object> queue;
	
	// State of thread
	boolean running;
	
    public ElasticsearchInsertionThread(ElasticsearchClient client, BlockingQueue<Object> queue) {
    	this.client = client;
    	this.queue = queue;
    	this.indexName = System.getenv().get("es_index_name");
    	this.bulkMaxActions = Integer.valueOf(System.getenv().get("es_bulk_max_actions"));
    	this.running = false;
    }
    
    /**
     * Get state of thread
     * @return true if the thread is running, otherwise false
     */
    public boolean isRunning() {
    	return running;
    }

    @Override
    public void run() {
    	running = true;
    	try {
			Object record;
			while ((record = queue.take()) != null) {
				@SuppressWarnings("unchecked")
				Map<String, Object> esValues = (Map<String, Object>)record;
				
				if (esValues.isEmpty()) {
					// empty value to exit the thread
					// Flush documents not inserted before leaving
					if (listBulkOperation.size() > 0) {
						insertIntoElasticsearch();
					}
					break;
				}
				
				// Create bulk operation
				listBulkOperation.add(BulkOperation.of(_0 -> _0
						.create(_1 -> _1
								.index(indexName)
								.id(docId)
								.document(esValues))));
				
				if (listBulkOperation.size() == bulkMaxActions) {
					// Send the bulk operations once maximum size has been reached
					insertIntoElasticsearch();
				}
			}
		} catch (InterruptedException e) {
			LOG.error("Error getting data from queue " + e.getMessage());
		}
    	running = false;
    }
    
    /**
     * Insert documents in the bulk into Elasticsearch
     */
    void insertIntoElasticsearch() {
		try {
			BulkResponse response = client.bulk(_0 -> _0
					.operations(listBulkOperation));
			if (response.errors()) {
				LOG.error("Bulk Response has failures");
			}
		} catch (IOException | ElasticsearchException e) {
			LOG.error("Error while sending records to Elasticsearch: " + e.getMessage());
		}

	    listBulkOperation.clear();
    }
}

nicolasm35 avatar Jan 18 '23 22:01 nicolasm35

@nicolasm35 as mentioned previously, an implementation will be part of the next release. See PR #474 and https://github.com/elastic/elasticsearch-java/tree/main/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk

If you can't wait for the next release, I suggest you copy that code instead of using this more limited implementation.

swallez avatar Jan 24 '23 14:01 swallez