elasticsearch-java icon indicating copy to clipboard operation
elasticsearch-java copied to clipboard

Thread lock when unable to insert in Elasticsearch

Open nicolasm35 opened this issue 11 months ago • 3 comments

Java API client version

8.15.1

Java version

openjdk version "21.0.5" 2024-10-15 LTS

Elasticsearch Version

8.15.1

Problem description

Insert data in Elasticsearch.

Important parameters:
throughput = 50 records/s bulk concurrent requests = 2 bulk max actions = 10 bulk max size = 90 (never triggers insertions)

BulkIngester

        bulkIngester = BulkIngester.of(b -> b
                .client(client)
                .maxOperations(bulkMaxActions)
                .maxConcurrentRequests(bulkConcurrentRequests)
                .maxSize(bulkMaxSize * 1024L * 1024)
                .listener(esBulkListener));

BulkListener

public class EsBulkListener implements BulkListener<byte[]> {

    private static final Logger LOG = LogManager.getLogger(EsBulkListener.class);

    private AckCounter ackCounter;

    private boolean acknowledgeRecords;

    public EsBulkListener(boolean acknowledgeRecords) {
        this.ackCounter = new AckCounter();
        this.acknowledgeRecords = acknowledgeRecords;
    }

    @Override
    public void beforeBulk(long executionId, BulkRequest request, List<byte[]> contexts) {
        //LOG.info("BulkIngester Execution (" + executionId + ") - About to execute new bulk insert composed of " + request.operations().size() + " actions");
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<byte[]> contexts, BulkResponse response) {
        if (response.errors()) {
            LOG.error("BulkIngester Bulk Response has failures: " + response.errors());
            if (acknowledgeRecords) {
                response.items().forEach(i -> {
                    if (i.error() != null) {
                        ackCounter.updateFailure(1L);
                    } else {
                        ackCounter.updateSuccess(1L);
                    }
                });
            }
        } else {
            //LOG.info("BulkIngester Execution (" + executionId + ") - Bulk insert composed of " + request.operations().size() +" actions, took " + response.took() + " ms");
            if (acknowledgeRecords) {
                ackCounter.updateSuccess(request.operations().size());
            }
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<byte[]> contexts, Throwable failure) {
        LOG.error("BulkIngester (" + executionId + ") - Execution of bulk request failed: " + failure.getMessage());
        if (acknowledgeRecords) {
            ackCounter.updateFailure(request.operations().size());
        }
    }

    AckCounter getAckCounter() {
        return ackCounter;
    }
}

Insertion:

    public void send(Object record) {
    	Map<String, Object> esValues = (Map<String, Object>)record;
    	String docId = (String) esValues.remove(DOC_ID);
    	bulkIngester.add(op -> op
    			.index(ind -> ind
    					.id(docId)
    					.index(indexName)
    					.document(esValues)));
    }

Set Elasticsearch master down to fail insertions in Elasticsearch.
You have errors: bulk request failed: Connection refused Then fix Elasticsearch master to insert again. Insertions are blocked.

If bulk concurrent requests = 1 there is no issue.
Note that same test does not fail with deprecated bulk processor

nicolasm35 avatar Jan 17 '25 16:01 nicolasm35