Closed
Description
Java API client version
7.17.9 and 7.17.10-SNAPSHOT
Java version
java 19
Elasticsearch Version
7.17.9
Problem description
When using the BulkIngester with a BulkListener then calling the close() on the BulkIngester returns before all afterBulk() BulkListener callbacks are finished. Below is a snippet of code that uses the BulkIngester. If you simply add a few logging statements after the close() and in the afterBulk() you will see that the afterBulk() callbacks are still busy after the close() returns.
That should not be the case. On return of the close() call the bulk should have been handled completely.
Thanks for looking into the problem.
``
private void handleBulk(Collection<E> events) {
long start = DateUtils.nowMillis();
// Create bulk listener.
BulkListener<String> bulkListener = new BulkListener<String>() {
@Override
public void beforeBulk(long executionId, BulkRequest bulkRequest, List<String> contexts) {
}
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, BulkResponse bulkResponse) {
for (BulkResponseItem bulkResponseItem : bulkResponse.items()) {
if (bulkResponseItem.error() != null) {
logger.error("Event with id '%s' has error %s".formatted(bulkResponseItem.id(), bulkResponseItem.error()));
}
}
// Check for errors.
if (bulkResponse.errors()) {
logger.error("Bulk processing of %d events has some failures".formatted(events.size()));
}
else {
logger.info("Bulk processed %d events in %d ms".formatted(events.size(), DateUtils.diffMillis(start)));
}
}
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, Throwable failure) {
// Since all event processing failed we can skip adding the specific event indexes to the set of indexes to refresh after the bulk request is completed if necessary.
logger.error("Bulk processing %d events failed completely. %s".formatted(events.size(), ExceptionUtils.getMessage(failure)));
}
};
// Create bulk ingester.
BulkIngester<String> bulkIngester = BulkIngester.of(ingesterBuilder ->
ingesterBuilder
.client(elasticSearch.getESClient())
// Accumulate one next batch while processing the current batch. Do not set to 0, the bulkIngester.flush() will hang, filed bug, not fixed yet in 7.17.10-SNAPSHOT!
.maxConcurrentRequests(1)
// Process current batch after each 10.000 operations added.
.maxOperations(10000)
// Or process current batch after 5 MB of data was added.
.maxSize(5 * 1024 * 1024)
// Or process current batch when 1 seconds have elapsed.
.flushInterval(1, TimeUnit.SECONDS)
.globalSettings(gsBuilder ->
gsBuilder
.waitForActiveShards(asBuilder -> asBuilder.count(1))
.refresh(Refresh.False)
.listener(bulkListener)
);
try {
// Add events to bulk ingester.
for (E event : events) {
switch (event.action()) {
case create:
bulkIngester.add(new CreateOperation.Builder<BinaryData>()
.index(event.esIndex())
.id(event.id())
.document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8), ContentType.APPLICATION_JSON))
.build()
._toBulkOperation());
break;
case update:
// Full update of document.
bulkIngester.add(new IndexOperation.Builder<BinaryData>()
.index(event.esIndex())
.id(event.id())
.document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8), ContentType.APPLICATION_JSON))
.build()
._toBulkOperation());
break;
case purge:
// Real physical delete of document.
bulkIngester.add(new DeleteOperation.Builder()
.index(event.esIndex())
.id(event.id())
.build()
._toBulkOperation());
break;
default:
// Should not get here. Log anyway.
logger.error(String.format("Skipped event with unsupported action '%s' -> %s", event.action().name(), event.toJson()));
break;
}
}
// ElasticSearch bug: the call to close does not wait until the listener has handled the afterBulk() callback.
//
bulkIngester.close();
}
catch (Exception e) {
logger.error("Failed to process %d events. %s".formatted(events.size(), e.getMessage()));
}
}
Metadata
Metadata
Assignees
Labels
No labels