Skip to content

BulkIngester close() does not wait for listener to finish #559

Closed
@frank-montyne

Description

@frank-montyne

Java API client version

7.17.9 and 7.17.10-SNAPSHOT

Java version

java 19

Elasticsearch Version

7.17.9

Problem description

When using the BulkIngester with a BulkListener then calling the close() on the BulkIngester returns before all afterBulk() BulkListener callbacks are finished. Below is a snippet of code that uses the BulkIngester. If you simply add a few logging statements after the close() and in the afterBulk() you will see that the afterBulk() callbacks are still busy after the close() returns.
That should not be the case. On return of the close() call the bulk should have been handled completely.

Thanks for looking into the problem.

``

private void handleBulk(Collection<E> events) {
	long start = DateUtils.nowMillis();

	// Create bulk listener.
	BulkListener<String> bulkListener = new BulkListener<String>() {
		@Override
		public void beforeBulk(long executionId, BulkRequest bulkRequest, List<String> contexts) {
		}

		@Override
		public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, BulkResponse bulkResponse) {
			for (BulkResponseItem bulkResponseItem : bulkResponse.items()) {

				if (bulkResponseItem.error() != null) {
					logger.error("Event with id '%s' has error %s".formatted(bulkResponseItem.id(), bulkResponseItem.error()));
				}
			}
			// Check for errors.
			if (bulkResponse.errors()) {
				logger.error("Bulk processing of %d events has some failures".formatted(events.size()));
			}
			else {
				logger.info("Bulk processed %d events in %d ms".formatted(events.size(), DateUtils.diffMillis(start)));
			}
		}

		@Override
		public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, Throwable failure) {
			// Since all event processing failed we can skip adding the specific event indexes to the set of indexes to refresh after the bulk request is completed if necessary.

			logger.error("Bulk processing %d events failed completely. %s".formatted(events.size(), ExceptionUtils.getMessage(failure)));
		}
	};

	// Create bulk ingester.
	BulkIngester<String> bulkIngester = BulkIngester.of(ingesterBuilder ->
		ingesterBuilder
			.client(elasticSearch.getESClient())
			// Accumulate one next batch while processing the current batch. Do not set to 0, the bulkIngester.flush() will hang, filed bug, not fixed yet in 7.17.10-SNAPSHOT!
			.maxConcurrentRequests(1)
			// Process current batch after each 10.000 operations added.
			.maxOperations(10000)
			// Or process current batch after 5 MB of data was added.
			.maxSize(5 * 1024 * 1024)
			// Or process current batch when 1 seconds have elapsed.
			.flushInterval(1, TimeUnit.SECONDS)
			.globalSettings(gsBuilder ->
				gsBuilder
					.waitForActiveShards(asBuilder -> asBuilder.count(1))
					.refresh(Refresh.False)
			.listener(bulkListener)
		);

	try {
		// Add events to bulk ingester.
		for (E event : events) {
			switch (event.action()) {
				case create:
					bulkIngester.add(new CreateOperation.Builder<BinaryData>()
							.index(event.esIndex())
							.id(event.id())
							.document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8), ContentType.APPLICATION_JSON))
							.build()
							._toBulkOperation());
					break;

				case update:
              // Full update of document.
					bulkIngester.add(new IndexOperation.Builder<BinaryData>()
							.index(event.esIndex())
							.id(event.id())
							.document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8), ContentType.APPLICATION_JSON))
							.build()
							._toBulkOperation());
					break;

				case purge:
					// Real physical delete of document.
					bulkIngester.add(new DeleteOperation.Builder()
							.index(event.esIndex())
							.id(event.id())
							.build()
							._toBulkOperation());
					break;

				default:
					// Should not get here. Log anyway.
					logger.error(String.format("Skipped event with unsupported action '%s' -> %s", event.action().name(), event.toJson()));
					break;
			}
		}
     // ElasticSearch bug: the call to close does not wait until the listener has handled the afterBulk() callback.
     //
		bulkIngester.close();
	}
	catch (Exception e) {
		logger.error("Failed to process %d events. %s".formatted(events.size(), e.getMessage()));
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions