From 9a6f784c10a9bcbf4c443290b94d504fb850c408 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Tue, 21 Jan 2025 16:10:01 +0100 Subject: [PATCH 01/11] WIP: added backoff policy logic to operations --- .../_helpers/bulk/BulkIngester.java | 109 ++++-- .../bulk/BulkOperationRepeatable.java | 42 +++ .../_helpers/bulk/IngesterOperation.java | 66 ++-- .../clients/transport/BackoffPolicy.java | 329 ++++++++++++++++++ .../_helpers/bulk/BulkIngesterTest.java | 5 +- 5 files changed, 484 insertions(+), 67 deletions(-) create mode 100644 java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java create mode 100644 java-client/src/main/java/co/elastic/clients/transport/BackoffPolicy.java diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 91d194844..12a77d465 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -24,6 +24,7 @@ import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.util.ApiTypeHelper; import co.elastic.clients.util.ObjectBuilder; @@ -32,9 +33,11 @@ import javax.annotation.Nullable; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -43,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.stream.Collectors; public class BulkIngester implements AutoCloseable { @@ -63,10 +67,11 @@ public class BulkIngester implements AutoCloseable { private @Nullable ScheduledFuture flushTask; private @Nullable ScheduledExecutorService scheduler; private boolean isExternalScheduler = false; + private BackoffPolicy backoffPolicy; // Current state - private List operations = new ArrayList<>(); - private List contexts = null; // Created on demand + private List> operations = new ArrayList<>(); + //private List contexts = null; // Created on demand private long currentSize; private int requestsInFlightCount; private volatile boolean isClosed = false; @@ -81,6 +86,7 @@ public class BulkIngester implements AutoCloseable { private static class RequestExecution { public final long id; public final BulkRequest request; + // TODO context list grows in size while elements are null public final List contexts; public final CompletionStage futureResponse; @@ -101,6 +107,7 @@ private BulkIngester(Builder builder) { this.maxSize = builder.bulkSize < 0 ? Long.MAX_VALUE : builder.bulkSize; this.maxOperations = builder.bulkOperations < 0 ? Integer.MAX_VALUE : builder.bulkOperations; this.listener = builder.listener; + this.backoffPolicy = builder.backoffPolicy; this.flushIntervalMillis = builder.flushIntervalMillis; if (flushIntervalMillis != null || listener != null) { @@ -127,6 +134,10 @@ private BulkIngester(Builder builder) { TimeUnit.MILLISECONDS ); } + + if (backoffPolicy == null) { + backoffPolicy = BackoffPolicy.noBackoff(); + } } //----- Getters @@ -168,7 +179,7 @@ public Duration flushInterval() { * The number of operations that have been buffered, waiting to be sent. */ public int pendingOperations() { - List operations = this.operations; + List> operations = this.operations; return operations == null ? 0 : operations.size(); } @@ -273,29 +284,45 @@ private void failsafeFlush() { } public void flush() { + // Keeping sent operations for possible retries + List> requestsSent = new ArrayList<>(); RequestExecution exec = sendRequestCondition.whenReadyIf( () -> { // May happen on manual and periodic flushes return !operations.isEmpty(); }, () -> { + // Selecting operations that can be sent immediately + List> immediateOpsRep = operations.stream() + .filter(BulkOperationRepeatable::canRetry) + .collect(Collectors.toList()); + + // Dividing actual operations from contexts + List immediateOps = immediateOpsRep.stream() + .map(BulkOperationRepeatable::getOperation) + .collect(Collectors.toList()); + + List contexts = immediateOpsRep.stream() + .map(BulkOperationRepeatable::getContext) + .collect(Collectors.toList()); + // Build the request - BulkRequest request = newRequest().operations(operations).build(); - List requestContexts = contexts == null ? Collections.nCopies(operations.size(), + BulkRequest request = newRequest().operations(immediateOps).build(); + + List requestContexts = contexts.isEmpty() ? Collections.nCopies(immediateOpsRep.size(), null) : contexts; // Prepare for next round - operations = new ArrayList<>(); - contexts = null; - currentSize = 0; + requestsSent.addAll(immediateOpsRep); + operations.removeAll(immediateOpsRep); + currentSize = operations.size(); addCondition.signalIfReady(); long id = sendRequestCondition.invocations(); if (listener != null) { - BulkRequest finalRequest = request; // synchronous execution to make sure it actually runs before - listener.beforeBulk(id, finalRequest, requestContexts); + listener.beforeBulk(id, request, requestContexts); } CompletionStage result = client.bulk(request); @@ -319,9 +346,8 @@ public void flush() { scheduler.submit(() -> { try { listener.afterBulk(exec.id, exec.request, exec.contexts, resp); - } - finally { - if(listenerInProgressCount.decrementAndGet() == 0){ + } finally { + if (listenerInProgressCount.decrementAndGet() == 0) { closeCondition.signalIfReady(); } } @@ -334,9 +360,8 @@ public void flush() { scheduler.submit(() -> { try { listener.afterBulk(exec.id, exec.request, exec.contexts, thr); - } - finally { - if(listenerInProgressCount.decrementAndGet() == 0){ + } finally { + if (listenerInProgressCount.decrementAndGet() == 0) { closeCondition.signalIfReady(); } } @@ -358,30 +383,41 @@ public void add(BulkOperation operation, Context context) { throw new IllegalStateException("Ingester has been closed"); } - IngesterOperation ingestOp = IngesterOperation.of(operation, client._jsonpMapper()); + BulkOperationRepeatable repeatableOp = new BulkOperationRepeatable<>(operation, context, Optional.empty()); + + innerAdd(repeatableOp); + } + + private void addRetry(BulkOperationRepeatable repeatableOp) { + innerAdd(repeatableOp); + } + + private void innerAdd(BulkOperationRepeatable repeatableOp) { + IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper()); addCondition.whenReady(() -> { - if (context != null) { - // Lazily build the context list - if (contexts == null) { - int size = operations.size(); - if (size == 0) { - contexts = new ArrayList<>(); - } else { - contexts = new ArrayList<>(Collections.nCopies(size, null)); - } - } - contexts.add(context); - } +// Context context = repeatableOp.getContext(); +// +// if (context != null) { +// // Lazily build the context list +// if (contexts == null) { +// int size = operations.size(); +// if (size == 0) { +// contexts = new ArrayList<>(); +// } else { +// contexts = new ArrayList<>(Collections.nCopies(size, null)); +// } +// } +// contexts.add(context); +// } operations.add(ingestOp.operation()); currentSize += ingestOp.size(); if (!canAddOperation()) { flush(); - } - else { + } else { addCondition.signalIfReady(); } }); @@ -440,6 +476,7 @@ public static class Builder implements ObjectBuilder listener; private ScheduledExecutorService scheduler; + private BackoffPolicy backoffPolicy; public Builder client(ElasticsearchAsyncClient client) { this.client = client; @@ -455,7 +492,8 @@ public Builder client(ElasticsearchClient client) { } /** - * Sets when to flush a new bulk request based on the number of operations currently added. Defaults to + * Sets when to flush a new bulk request based on the number of operations currently added. + * Defaults to * {@code 1000}. Can be set to {@code -1} to disable it. * * @throws IllegalArgumentException if less than -1. @@ -519,6 +557,7 @@ public Builder flushInterval(long value, TimeUnit unit) { *

* Flushing is still subject to the maximum number of requests set with * {@link #maxConcurrentRequests}. + * * @deprecated use {@link #scheduler(ScheduledExecutorService)} */ @Deprecated @@ -541,6 +580,12 @@ public Builder listener(BulkListener listener) { return this; } + + public Builder backoffPolicy(BackoffPolicy backoffPolicy) { + this.backoffPolicy = backoffPolicy; + return this; + } + /** * Sets global bulk request settings that will be applied to all requests sent by the ingester. */ diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java new file mode 100644 index 000000000..3a82831e0 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java @@ -0,0 +1,42 @@ +package co.elastic.clients.elasticsearch._helpers.bulk; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Iterator; +import java.util.Optional; + +public class BulkOperationRepeatable { + private final BulkOperation operation; + private final Context context; + private final Optional> retries; + private Instant nextRetry; + + public BulkOperationRepeatable(BulkOperation request, Context context, Optional> retries) { + this.operation = request; + this.context = context; + this.retries = retries; + this.nextRetry = Instant.now().plus(retries.map(Iterator::next).orElse(0L), ChronoUnit.MILLIS); + } + + public BulkOperation getOperation() { + return operation; + } + + public Context getContext() { + return context; + } + + public Optional> getRetries() { + return retries; + } + + public Instant getNextRetry() { + return this.nextRetry; + } + + public boolean canRetry() { + return this.retries.map(Iterator::hasNext).orElse(true); + } +} diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java index f2a2ac28c..2840c052f 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -35,111 +35,111 @@ * A bulk operation whose size has been calculated and content turned to a binary blob (to compute its size). */ public class IngesterOperation { - private final BulkOperation operation; + private final BulkOperationRepeatable repeatableOp; private final long size; - public IngesterOperation(BulkOperation operation, long size) { - this.operation = operation; + public IngesterOperation(BulkOperationRepeatable repeatableOp, long size) { + this.repeatableOp = repeatableOp; this.size = size; } - public static IngesterOperation of(BulkOperation operation, JsonpMapper mapper) { - switch (operation._kind()) { + public static IngesterOperation of(BulkOperationRepeatable repeatableOp, JsonpMapper mapper) { + switch (repeatableOp.getOperation()._kind()) { case Create: - return createOperation(operation, mapper); + return createOperation(repeatableOp, mapper); case Index: - return indexOperation(operation, mapper); + return indexOperation(repeatableOp, mapper); case Update: - return updateOperation(operation, mapper); + return updateOperation(repeatableOp, mapper); case Delete: - return deleteOperation(operation); + return deleteOperation(repeatableOp); default: - throw new IllegalStateException("Unknown bulk operation type " + operation._kind()); + throw new IllegalStateException("Unknown bulk operation type " + repeatableOp.getOperation()._kind()); } } - public BulkOperation operation() { - return this.operation; + public BulkOperationRepeatable operation() { + return this.repeatableOp; } public long size() { return this.size; } - private static IngesterOperation createOperation(BulkOperation operation, JsonpMapper mapper) { - CreateOperation create = operation.create(); - BulkOperation newOperation; + private static IngesterOperation createOperation(BulkOperationRepeatable repeatableOp, JsonpMapper mapper) { + CreateOperation create = repeatableOp.getOperation().create(); + BulkOperationRepeatable newOperation; long size = basePropertiesSize(create); if (create.document() instanceof BinaryData) { - newOperation = operation; + newOperation = repeatableOp; size += ((BinaryData) create.document()).size(); } else { BinaryData binaryDoc = BinaryData.of(create.document(), mapper); size += binaryDoc.size(); - newOperation = BulkOperation.of(bo -> bo.create(idx -> { + newOperation = new BulkOperationRepeatable(BulkOperation.of(bo -> bo.create(idx -> { copyCreateProperties(create, idx); return idx.document(binaryDoc); - })); + })),repeatableOp.getContext(),repeatableOp.getRetries()); } return new IngesterOperation(newOperation, size); } - private static IngesterOperation indexOperation(BulkOperation operation, JsonpMapper mapper) { - IndexOperation index = operation.index(); - BulkOperation newOperation; + private static IngesterOperation indexOperation(BulkOperationRepeatable repeatableOp, JsonpMapper mapper) { + IndexOperation index = repeatableOp.getOperation().index(); + BulkOperationRepeatable newOperation; long size = basePropertiesSize(index); if (index.document() instanceof BinaryData) { - newOperation = operation; + newOperation = repeatableOp; size += ((BinaryData) index.document()).size(); } else { BinaryData binaryDoc = BinaryData.of(index.document(), mapper); size += binaryDoc.size(); - newOperation = BulkOperation.of(bo -> bo.index(idx -> { + newOperation = new BulkOperationRepeatable(BulkOperation.of(bo -> bo.index(idx -> { copyIndexProperties(index, idx); return idx.document(binaryDoc); - })); + })),repeatableOp.getContext(),repeatableOp.getRetries()); } return new IngesterOperation(newOperation, size); } - private static IngesterOperation updateOperation(BulkOperation operation, JsonpMapper mapper) { - UpdateOperation update = operation.update(); - BulkOperation newOperation; + private static IngesterOperation updateOperation(BulkOperationRepeatable repeatableOp, JsonpMapper mapper) { + UpdateOperation update = repeatableOp.getOperation().update(); + BulkOperationRepeatable newOperation; long size = basePropertiesSize(update) + size("retry_on_conflict", update.retryOnConflict()) + size("require_alias", update.requireAlias()); if (update.binaryAction() != null) { - newOperation = operation; + newOperation = repeatableOp; size += update.binaryAction().size(); } else { BinaryData action = BinaryData.of(update.action(), mapper); size += action.size(); - newOperation = BulkOperation.of(bo -> bo.update(u -> { + newOperation = new BulkOperationRepeatable(BulkOperation.of(bo -> bo.update(u -> { copyBaseProperties(update, u); return u .binaryAction(action) .requireAlias(update.requireAlias()) .retryOnConflict(update.retryOnConflict()); - })); + })),repeatableOp.getContext(),repeatableOp.getRetries()); } return new IngesterOperation(newOperation, size); } - private static IngesterOperation deleteOperation(BulkOperation operation) { - DeleteOperation delete = operation.delete(); - return new IngesterOperation(operation, basePropertiesSize(delete)); + private static IngesterOperation deleteOperation(BulkOperationRepeatable repeatableOp) { + DeleteOperation delete = repeatableOp.getOperation().delete(); + return new IngesterOperation(repeatableOp, basePropertiesSize(delete)); } diff --git a/java-client/src/main/java/co/elastic/clients/transport/BackoffPolicy.java b/java-client/src/main/java/co/elastic/clients/transport/BackoffPolicy.java new file mode 100644 index 000000000..798af4c06 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/BackoffPolicy.java @@ -0,0 +1,329 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Provides a set of generic backoff policies. Backoff policies are used to calculate the number of times an action will be retried + * and the intervals between those retries. + * + * Notes for implementing custom subclasses: + * + * The underlying mathematical principle of BackoffPolicy are progressions which can be either finite or infinite although + * the latter should not be used for retrying. A progression can be mapped to a java.util.Iterator with the following + * semantics: + * + *

    + *
  • #hasNext() determines whether the progression has more elements. Return true for infinite progressions + *
  • + *
  • #next() determines the next element in the progression, i.e. the next wait time period
  • + *
+ * + * Note that backoff policies are exposed as Iterables in order to be consumed multiple times. + */ +public abstract class BackoffPolicy implements Iterable { + private static final BackoffPolicy NO_BACKOFF = new NoBackoff(); + + /** + * Creates a backoff policy that will not allow any backoff, i.e. an operation will fail after the first attempt. + * + * @return A backoff policy without any backoff period. The returned instance is thread safe. + */ + public static BackoffPolicy noBackoff() { + return NO_BACKOFF; + } + + /** + * Creates an new constant backoff policy with the provided configuration. + * + * @param delay The delay defines how long to wait between retry attempts. Must not be null. + * Must be <= Integer.MAX_VALUE ms. + * @param maxNumberOfRetries The maximum number of retries. Must be a non-negative number. + * @return A backoff policy with a constant wait time between retries. The returned instance is thread safe but each + * iterator created from it should only be used by a single thread. + */ + public static BackoffPolicy constantBackoff(Long delay, int maxNumberOfRetries) { + return new ConstantBackoff(checkDelay(delay), maxNumberOfRetries); + } + + /** + * Creates an new exponential backoff policy with a default configuration of 50 ms initial wait period and 8 retries taking + * roughly 5.1 seconds in total. + * + * @return A backoff policy with an exponential increase in wait time for retries. The returned instance is thread safe but each + * iterator created from it should only be used by a single thread. + */ + public static BackoffPolicy exponentialBackoff() { + return exponentialBackoff(50L, 8); + } + + /** + * Creates an new exponential backoff policy with the provided configuration. + * + * @param initialDelay The initial delay defines how long to wait for the first retry attempt. Must not be null. + * Must be <= Integer.MAX_VALUE ms. + * @param maxNumberOfRetries The maximum number of retries. Must be a non-negative number. + * @return A backoff policy with an exponential increase in wait time for retries. The returned instance is thread safe but each + * iterator created from it should only be used by a single thread. + */ + public static BackoffPolicy exponentialBackoff(Long initialDelay, int maxNumberOfRetries) { + return new ExponentialBackoff(checkDelay(initialDelay), maxNumberOfRetries); + } + + /** + * Creates a new linear backoff policy with the provided configuration + * + * @param delayIncrement The amount by which to increment the delay on each retry + * @param maxNumberOfRetries The maximum number of retries + * @param maximumDelay The maximum delay + * @return A backoff policy with linear increase in wait time for retries. + */ + public static BackoffPolicy linearBackoff(Long delayIncrement, int maxNumberOfRetries, Long maximumDelay) { + return new LinearBackoff(delayIncrement, maxNumberOfRetries, maximumDelay); + } + + /** + * Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy. + */ + public static BackoffPolicy wrap(BackoffPolicy delegate, Runnable onBackoff) { + return new WrappedBackoffPolicy(delegate, onBackoff); + } + + private static Long checkDelay(Long delay) { + if (delay > Integer.MAX_VALUE) { + throw new IllegalArgumentException("delay must be <= " + Integer.MAX_VALUE + " ms"); + } + return delay; + } + + private static class NoBackoff extends BackoffPolicy { + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + + @Override + public String toString() { + return "NoBackoff"; + } + } + + private static class ExponentialBackoff extends BackoffPolicy { + private final Long start; + + private final int numberOfElements; + + private ExponentialBackoff(Long start, int numberOfElements) { + assert start >= 0; + assert numberOfElements >= 0; + this.start = start; + this.numberOfElements = numberOfElements; + } + + @Override + public Iterator iterator() { + return new ExponentialBackoffIterator(start, numberOfElements); + } + + @Override + public String toString() { + return "ExponentialBackoff{start=" + start + ", numberOfElements=" + numberOfElements + '}'; + } + } + + private static class ExponentialBackoffIterator implements Iterator { + private final int numberOfElements; + + private final Long start; + + private int currentlyConsumed; + + private ExponentialBackoffIterator(Long start, int numberOfElements) { + this.start = start; + this.numberOfElements = numberOfElements; + } + + @Override + public boolean hasNext() { + return currentlyConsumed < numberOfElements; + } + + @Override + public Long next() { + if (!hasNext()) { + throw new NoSuchElementException("Only up to " + numberOfElements + " elements"); + } + Long result = start + 10L * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1); + currentlyConsumed++; + return result; + } + } + + private static final class ConstantBackoff extends BackoffPolicy { + private final Long delay; + + private final int numberOfElements; + + ConstantBackoff(Long delay, int numberOfElements) { + assert numberOfElements >= 0; + this.delay = delay; + this.numberOfElements = numberOfElements; + } + + @Override + public Iterator iterator() { + return new ConstantBackoffIterator(delay, numberOfElements); + } + + @Override + public String toString() { + return "ConstantBackoff{delay=" + delay + ", numberOfElements=" + numberOfElements + '}'; + } + } + + private static final class ConstantBackoffIterator implements Iterator { + private final Long delay; + private final int numberOfElements; + private int curr; + + ConstantBackoffIterator(Long delay, int numberOfElements) { + this.delay = delay; + this.numberOfElements = numberOfElements; + } + + @Override + public boolean hasNext() { + return curr < numberOfElements; + } + + @Override + public Long next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + curr++; + return delay; + } + } + + private static final class WrappedBackoffPolicy extends BackoffPolicy { + private final BackoffPolicy delegate; + private final Runnable onBackoff; + + WrappedBackoffPolicy(BackoffPolicy delegate, Runnable onBackoff) { + this.delegate = delegate; + this.onBackoff = onBackoff; + } + + @Override + public Iterator iterator() { + return new WrappedBackoffIterator(delegate.iterator(), onBackoff); + } + + @Override + public String toString() { + return "WrappedBackoffPolicy{delegate=" + delegate + ", onBackoff=" + onBackoff + '}'; + } + } + + private static final class WrappedBackoffIterator implements Iterator { + private final Iterator delegate; + private final Runnable onBackoff; + + WrappedBackoffIterator(Iterator delegate, Runnable onBackoff) { + this.delegate = delegate; + this.onBackoff = onBackoff; + } + + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public Long next() { + if (false == delegate.hasNext()) { + throw new NoSuchElementException(); + } + onBackoff.run(); + return delegate.next(); + } + } + + private static final class LinearBackoff extends BackoffPolicy { + + private final Long delayIncrement; + private final int maxNumberOfRetries; + private final Long maximumDelay; + + private LinearBackoff(Long delayIncrement, int maxNumberOfRetries, @Nullable Long maximumDelay) { + this.delayIncrement = delayIncrement; + this.maxNumberOfRetries = maxNumberOfRetries; + this.maximumDelay = maximumDelay; + } + + @Override + public Iterator iterator() { + return new LinearBackoffIterator(delayIncrement, maxNumberOfRetries, maximumDelay); + } + + @Override + public String toString() { + return "LinearBackoff{" + + "delayIncrement=" + + delayIncrement + + ", maxNumberOfRetries=" + + maxNumberOfRetries + + ", maximumDelay=" + + maximumDelay + + '}'; + } + } + + private static final class LinearBackoffIterator implements Iterator { + + private final Long delayIncrement; + private final int maxNumberOfRetries; + private final Long maximumDelay; + private int curr; + + private LinearBackoffIterator(Long delayIncrement, int maxNumberOfRetries, @Nullable Long maximumDelay) { + this.delayIncrement = delayIncrement; + this.maxNumberOfRetries = maxNumberOfRetries; + this.maximumDelay = maximumDelay; + } + + @Override + public boolean hasNext() { + return curr < maxNumberOfRetries; + } + + @Override + public Long next() { + curr++; + Long Long = curr * delayIncrement; + return maximumDelay == null ? Long : Long.compareTo(maximumDelay) < 0 ? Long : maximumDelay; + } + } +} diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index a76f3f75f..38f64d21e 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -44,6 +44,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -223,7 +224,7 @@ public void multiThreadStressTest() throws InterruptedException, IOException { public void sizeLimitTest() throws Exception { TestTransport transport = new TestTransport(); - long operationSize = IngesterOperation.of(operation, transport.jsonpMapper()).size(); + long operationSize = IngesterOperation.of(new BulkOperationRepeatable<>(operation,null,Optional.empty()), transport.jsonpMapper()).size(); BulkIngester ingester = BulkIngester.of(b -> b .client(new ElasticsearchAsyncClient(transport)) @@ -447,7 +448,7 @@ public void pipelineTest() { String createStr = JsonpUtils.toJsonString(create, mapper); assertEquals(json, createStr); - BulkOperation create1 = IngesterOperation.of(create, mapper).operation(); + BulkOperation create1 = IngesterOperation.of(new BulkOperationRepeatable<>(create,null, Optional.empty()), mapper).operation().getOperation(); String create1Str = JsonpUtils.toJsonString(create1, mapper); assertEquals(json, create1Str); From 38587112f7501fcac4b9119129225018c859fb8d Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Wed, 22 Jan 2025 16:33:46 +0100 Subject: [PATCH 02/11] all basic key points implemented --- .../_helpers/bulk/BulkIngester.java | 93 +++++++++++++++---- .../bulk/BulkOperationRepeatable.java | 25 +++-- .../_helpers/bulk/BulkIngesterTest.java | 66 +++++++------ 3 files changed, 127 insertions(+), 57 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 12a77d465..410cc9e4d 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -24,6 +24,7 @@ import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.util.ApiTypeHelper; @@ -33,9 +34,9 @@ import javax.annotation.Nullable; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletionStage; @@ -66,6 +67,7 @@ public class BulkIngester implements AutoCloseable { private @Nullable ScheduledFuture flushTask; private @Nullable ScheduledExecutorService scheduler; + private @Nullable ScheduledExecutorService retryScheduler; private boolean isExternalScheduler = false; private BackoffPolicy backoffPolicy; @@ -81,7 +83,7 @@ public class BulkIngester implements AutoCloseable { private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation); private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest); private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed); - private AtomicInteger listenerInProgressCount = new AtomicInteger(); + private final AtomicInteger listenerInProgressCount = new AtomicInteger(); private static class RequestExecution { public final long id; @@ -138,6 +140,21 @@ private BulkIngester(Builder builder) { if (backoffPolicy == null) { backoffPolicy = BackoffPolicy.noBackoff(); } + // preparing a scheduler that will trigger flushes when it finds enqueued requests ready to be retried + // TODO should we just keep a single scheduler? + else { + retryScheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("bulk-ingester-retry#" + ingesterId + "#" + t.getId()); + t.setDaemon(true); + return t; + }); + retryScheduler.scheduleWithFixedDelay( + this::retryFlush, + 1000,1000, // TODO should we hardcode this? + TimeUnit.MILLISECONDS + ); + } } //----- Getters @@ -283,9 +300,20 @@ private void failsafeFlush() { } } + // triggers a flush if it finds queued retries + private void retryFlush() { + try { + if (operations.stream().anyMatch(op -> op.getRetries() != null && op.isSendable())) { + flush(); + } + } catch (Throwable thr) { + // Log the error and continue + logger.error("Error in background flush", thr); + } + } + public void flush() { - // Keeping sent operations for possible retries - List> requestsSent = new ArrayList<>(); + List> sentRequests = new ArrayList<>(); RequestExecution exec = sendRequestCondition.whenReadyIf( () -> { // May happen on manual and periodic flushes @@ -294,7 +322,7 @@ public void flush() { () -> { // Selecting operations that can be sent immediately List> immediateOpsRep = operations.stream() - .filter(BulkOperationRepeatable::canRetry) + .filter(BulkOperationRepeatable::isSendable) .collect(Collectors.toList()); // Dividing actual operations from contexts @@ -309,11 +337,12 @@ public void flush() { // Build the request BulkRequest request = newRequest().operations(immediateOps).build(); - List requestContexts = contexts.isEmpty() ? Collections.nCopies(immediateOpsRep.size(), - null) : contexts; + List requestContexts = contexts.isEmpty() ? + Collections.nCopies(immediateOpsRep.size(), + null) : contexts; // Prepare for next round - requestsSent.addAll(immediateOpsRep); + sentRequests.addAll(immediateOpsRep); operations.removeAll(immediateOpsRep); currentSize = operations.size(); addCondition.signalIfReady(); @@ -340,18 +369,43 @@ public void flush() { // A request was actually sent exec.futureResponse.handle((resp, thr) -> { if (resp != null) { - // Success - if (listener != null) { - listenerInProgressCount.incrementAndGet(); - scheduler.submit(() -> { - try { - listener.afterBulk(exec.id, exec.request, exec.contexts, resp); - } finally { - if (listenerInProgressCount.decrementAndGet() == 0) { - closeCondition.signalIfReady(); + // Success? Checking if total or partial + List failedRequestsCanRetry = resp.items().stream() + .filter(i -> i.error() != null && i.status() == 429) + .collect(Collectors.toList()); + + if (failedRequestsCanRetry.isEmpty() || !backoffPolicy.equals(BackoffPolicy.noBackoff())) { + // Total success! ...or there's no retry policy implemented. Either way, can call + // listener after bulk + if (listener != null) { + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, exec.request, exec.contexts, resp); + } finally { + if (listenerInProgressCount.decrementAndGet() == 0) { + closeCondition.signalIfReady(); + } } + }); + } + } else { + // Partial success, retrying failed requests if policy allows it + // Getting original requests + for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) { + int index = resp.items().indexOf(bulkItemResponse); + BulkOperationRepeatable original = sentRequests.get(index); + if (original.getRetries().hasNext()) { + Iterator retries = + Optional.ofNullable(original.getRetries()).orElse(backoffPolicy.iterator()); + addRetry(new BulkOperationRepeatable<>(original.getOperation(), + original.getContext(), retries)); + // TODO remove after checking + assert (bulkItemResponse.operationType().toString().equals(sentRequests.get(index).getOperation()._kind().toString())); } - }); + // TODO should print some message? + + } } } else { // Failure @@ -383,7 +437,8 @@ public void add(BulkOperation operation, Context context) { throw new IllegalStateException("Ingester has been closed"); } - BulkOperationRepeatable repeatableOp = new BulkOperationRepeatable<>(operation, context, Optional.empty()); + BulkOperationRepeatable repeatableOp = new BulkOperationRepeatable<>(operation, context, + null); innerAdd(repeatableOp); } diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java index 3a82831e0..2db2156a9 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java @@ -10,14 +10,15 @@ public class BulkOperationRepeatable { private final BulkOperation operation; private final Context context; - private final Optional> retries; - private Instant nextRetry; + private final Iterator retries; + private Instant retryTime; - public BulkOperationRepeatable(BulkOperation request, Context context, Optional> retries) { + public BulkOperationRepeatable(BulkOperation request, Context context, Iterator retries) { this.operation = request; this.context = context; this.retries = retries; - this.nextRetry = Instant.now().plus(retries.map(Iterator::next).orElse(0L), ChronoUnit.MILLIS); + // if the retries iterator is null it means that it's not a retry, otherwise calculating retry time + this.retryTime = Optional.ofNullable(retries).map(r -> Instant.now().plus(r.next(), ChronoUnit.MILLIS)).orElse(Instant.now()); } public BulkOperation getOperation() { @@ -28,15 +29,19 @@ public Context getContext() { return context; } - public Optional> getRetries() { + public Iterator getRetries() { return retries; } - public Instant getNextRetry() { - return this.nextRetry; - } +// public Instant getCurrentRetryTime() { +// return this.retryTime; +// } +// +// public Instant getNextRetryTime() { +// return Instant.now().plus(retries.next(), ChronoUnit.MILLIS); +// } - public boolean canRetry() { - return this.retries.map(Iterator::hasNext).orElse(true); + public boolean isSendable() { + return retryTime.isBefore(Instant.now()); } } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index 38f64d21e..4c707b851 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -44,7 +44,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -142,7 +141,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, for (int i = 0; i < numThreads; i++) { new Thread(() -> { try { - Thread.sleep((long)(Math.random() * 100)); + Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -169,7 +168,8 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, assertEquals(expectedOperations, listener.operations.get()); assertEquals(expectedOperations, transport.operations.get()); - int expectedRequests = expectedOperations / maxOperations + ((expectedOperations % maxOperations == 0) ? 0 : 1) ; + int expectedRequests = + expectedOperations / maxOperations + ((expectedOperations % maxOperations == 0) ? 0 : 1); assertEquals(expectedRequests, ingester.requestCount()); assertEquals(expectedRequests, listener.requests.get()); @@ -184,11 +184,12 @@ public void multiThreadStressTest() throws InterruptedException, IOException { // DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme // situation where the number of adding threads greatly exceeds the number of concurrent requests - // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests accordingly. + // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests + // accordingly. BulkIngester ingester = BulkIngester.of(b -> b - .client(client) - .globalSettings(s -> s.index(index)) - .flushInterval(5, TimeUnit.SECONDS) + .client(client) + .globalSettings(s -> s.index(index)) + .flushInterval(5, TimeUnit.SECONDS) ); RequestTest.AppData appData = new RequestTest.AppData(); @@ -210,21 +211,22 @@ public void multiThreadStressTest() throws InterruptedException, IOException { executor.submit(thread); } - executor.awaitTermination(10,TimeUnit.SECONDS); + executor.awaitTermination(10, TimeUnit.SECONDS); ingester.close(); client.indices().refresh(); IndicesStatsResponse indexStats = client.indices().stats(g -> g.index(index)); - assertTrue(indexStats.indices().get(index).primaries().docs().count()==100000); + assertTrue(indexStats.indices().get(index).primaries().docs().count() == 100000); } @Test public void sizeLimitTest() throws Exception { TestTransport transport = new TestTransport(); - long operationSize = IngesterOperation.of(new BulkOperationRepeatable<>(operation,null,Optional.empty()), transport.jsonpMapper()).size(); + long operationSize = IngesterOperation.of(new BulkOperationRepeatable<>(operation, null, null), + transport.jsonpMapper()).size(); BulkIngester ingester = BulkIngester.of(b -> b .client(new ElasticsearchAsyncClient(transport)) @@ -254,7 +256,7 @@ public void periodicFlushTest() throws Exception { // Disable other flushing limits .maxSize(-1) .maxOperations(-1) - .maxConcurrentRequests(Integer.MAX_VALUE-1) + .maxConcurrentRequests(Integer.MAX_VALUE - 1) ); // Add an operation every 100 ms to give time @@ -294,7 +296,8 @@ public void beforeBulk(long executionId, BulkRequest request, List context } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { + public void afterBulk(long executionId, BulkRequest request, List contexts, + BulkResponse response) { if (executionId == 2) { // Fail after the request is sent failureCount.incrementAndGet(); @@ -303,7 +306,8 @@ public void afterBulk(long executionId, BulkRequest request, List contexts } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { + public void afterBulk(long executionId, BulkRequest request, List contexts, + Throwable failure) { } }; @@ -356,11 +360,13 @@ public void beforeBulk(long executionId, BulkRequest request, List cont } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { + public void afterBulk(long executionId, BulkRequest request, List contexts, + BulkResponse response) { } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { + public void afterBulk(long executionId, BulkRequest request, List contexts, + Throwable failure) { } }; @@ -374,7 +380,7 @@ public void afterBulk(long executionId, BulkRequest request, List conte for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { // Set a context only after 5, so that we test filling with nulls. - Integer context = j < 5 ? null : i*10 + j; + Integer context = j < 5 ? null : i * 10 + j; ingester.add(operation, context); } } @@ -392,7 +398,7 @@ public void afterBulk(long executionId, BulkRequest request, List conte if (j < 5) { assertNull(contexts.get(j)); } else { - assertEquals(contexts.get(j), i*10 + j); + assertEquals(contexts.get(j), i * 10 + j); } } } @@ -434,21 +440,23 @@ public void beforeBulk(long executionId, BulkRequest request, List context @Test public void pipelineTest() { - String json = "{\"create\":{\"_id\":\"some_id\",\"_index\":\"some_idx\",\"pipeline\":\"pipe\",\"require_alias\":true}}"; + String json = "{\"create\":{\"_id\":\"some_id\",\"_index\":\"some_idx\",\"pipeline\":\"pipe\"," + + "\"require_alias\":true}}"; JsonpMapper mapper = new SimpleJsonpMapper(); BulkOperation create = BulkOperation.of(o -> o.create(c -> c - .pipeline("pipe") - .requireAlias(true) - .index("some_idx") - .id("some_id") - .document("Some doc") + .pipeline("pipe") + .requireAlias(true) + .index("some_idx") + .id("some_id") + .document("Some doc") )); String createStr = JsonpUtils.toJsonString(create, mapper); assertEquals(json, createStr); - BulkOperation create1 = IngesterOperation.of(new BulkOperationRepeatable<>(create,null, Optional.empty()), mapper).operation().getOperation(); + BulkOperation create1 = IngesterOperation.of(new BulkOperationRepeatable<>(create, null, null), + mapper).operation().getOperation(); String create1Str = JsonpUtils.toJsonString(create1, mapper); assertEquals(json, create1Str); @@ -495,8 +503,8 @@ public void endToEndTest() throws Exception { assertEquals( 42, client.get(b -> b - .index(index) - .id(id), + .index(index) + .id(id), RequestTest.AppData.class ).source().getIntValue() ); @@ -537,13 +545,15 @@ public void testConfigValidation() { private static class CountingListener implements BulkListener { public final AtomicInteger operations = new AtomicInteger(); public final AtomicInteger requests = new AtomicInteger(); + @Override public void beforeBulk(long executionId, BulkRequest request, List contexts) { } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { + public void afterBulk(long executionId, BulkRequest request, List contexts, + BulkResponse response) { operations.addAndGet(request.operations().size()); requests.incrementAndGet(); } @@ -596,7 +606,7 @@ public CompletableFuture performRequest }); @SuppressWarnings("unchecked") - CompletableFuture result = (CompletableFuture)response; + CompletableFuture result = (CompletableFuture) response; return result; } From 76dad4e3692ef0b57a1e5fe18ed1c4e569abefe9 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Wed, 22 Jan 2025 17:33:45 +0100 Subject: [PATCH 03/11] bugfixes --- .../_helpers/bulk/BulkIngester.java | 16 +++++++++++----- .../_helpers/bulk/BulkOperationRepeatable.java | 14 +++++++------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 410cc9e4d..c6c9c2403 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -151,7 +151,7 @@ private BulkIngester(Builder builder) { }); retryScheduler.scheduleWithFixedDelay( this::retryFlush, - 1000,1000, // TODO should we hardcode this? + 1000, 1000, // TODO should we hardcode this? TimeUnit.MILLISECONDS ); } @@ -374,7 +374,7 @@ public void flush() { .filter(i -> i.error() != null && i.status() == 429) .collect(Collectors.toList()); - if (failedRequestsCanRetry.isEmpty() || !backoffPolicy.equals(BackoffPolicy.noBackoff())) { + if (failedRequestsCanRetry.isEmpty() || backoffPolicy.equals(BackoffPolicy.noBackoff())) { // Total success! ...or there's no retry policy implemented. Either way, can call // listener after bulk if (listener != null) { @@ -395,7 +395,7 @@ public void flush() { for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) { int index = resp.items().indexOf(bulkItemResponse); BulkOperationRepeatable original = sentRequests.get(index); - if (original.getRetries().hasNext()) { + if (original.canRetry()) { Iterator retries = Optional.ofNullable(original.getRetries()).orElse(backoffPolicy.iterator()); addRetry(new BulkOperationRepeatable<>(original.getOperation(), @@ -403,8 +403,10 @@ public void flush() { // TODO remove after checking assert (bulkItemResponse.operationType().toString().equals(sentRequests.get(index).getOperation()._kind().toString())); } - // TODO should print some message? - + else{ + System.out.println("Retries finished"); + // TODO should print some message? + } } } } else { @@ -514,6 +516,10 @@ public void close() { if (scheduler != null && !isExternalScheduler) { scheduler.shutdownNow(); } + + if (retryScheduler != null) { + retryScheduler.shutdownNow(); + } } //---------------------------------------------------------------------------------------------------- diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java index 2db2156a9..4c050fb3d 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java @@ -33,13 +33,13 @@ public Iterator getRetries() { return retries; } -// public Instant getCurrentRetryTime() { -// return this.retryTime; -// } -// -// public Instant getNextRetryTime() { -// return Instant.now().plus(retries.next(), ChronoUnit.MILLIS); -// } + public Instant getCurrentRetryTime() { + return this.retryTime; + } + + public boolean canRetry() { + return Optional.ofNullable(retries).map(Iterator::hasNext).orElse(true); + } public boolean isSendable() { return retryTime.isBefore(Instant.now()); From 15c2a92ba1d73053b868af796df562976fee610f Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Mon, 27 Jan 2025 16:09:58 +0100 Subject: [PATCH 04/11] retry - main implementation done --- .../_helpers/bulk/BulkIngester.java | 152 ++++++++++++------ .../bulk/BulkOperationRepeatable.java | 19 ++- .../_helpers/bulk/IngesterOperation.java | 2 +- .../_helpers/bulk/BulkIngesterTest.java | 1 + 4 files changed, 118 insertions(+), 56 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index c6c9c2403..d7057dcda 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; @@ -84,11 +85,11 @@ public class BulkIngester implements AutoCloseable { private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest); private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed); private final AtomicInteger listenerInProgressCount = new AtomicInteger(); + private final AtomicInteger retriesInProgressCount = new AtomicInteger(); private static class RequestExecution { public final long id; public final BulkRequest request; - // TODO context list grows in size while elements are null public final List contexts; public final CompletionStage futureResponse; @@ -140,8 +141,7 @@ private BulkIngester(Builder builder) { if (backoffPolicy == null) { backoffPolicy = BackoffPolicy.noBackoff(); } - // preparing a scheduler that will trigger flushes when it finds enqueued requests ready to be retried - // TODO should we just keep a single scheduler? + // preparing a scheduler that will trigger flushes to retry failed requests else { retryScheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> { Thread t = Executors.defaultThreadFactory().newThread(r); @@ -149,11 +149,6 @@ private BulkIngester(Builder builder) { t.setDaemon(true); return t; }); - retryScheduler.scheduleWithFixedDelay( - this::retryFlush, - 1000, 1000, // TODO should we hardcode this? - TimeUnit.MILLISECONDS - ); } } @@ -264,7 +259,8 @@ private boolean canAddOperation() { } private boolean closedAndFlushed() { - return isClosed && operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0; + return isClosed && operations.isEmpty() && requestsInFlightCount == 0 + && listenerInProgressCount.get() == 0 && retriesInProgressCount.get() == 0; } //----- Ingester logic @@ -300,18 +296,6 @@ private void failsafeFlush() { } } - // triggers a flush if it finds queued retries - private void retryFlush() { - try { - if (operations.stream().anyMatch(op -> op.getRetries() != null && op.isSendable())) { - flush(); - } - } catch (Throwable thr) { - // Log the error and continue - logger.error("Error in background flush", thr); - } - } - public void flush() { List> sentRequests = new ArrayList<>(); RequestExecution exec = sendRequestCondition.whenReadyIf( @@ -334,13 +318,14 @@ public void flush() { .map(BulkOperationRepeatable::getContext) .collect(Collectors.toList()); + // If all contexts are null, no need for the list + if (contexts.stream().allMatch(Objects::isNull)) { + contexts = null; + } + // Build the request BulkRequest request = newRequest().operations(immediateOps).build(); - List requestContexts = contexts.isEmpty() ? - Collections.nCopies(immediateOpsRep.size(), - null) : contexts; - // Prepare for next round sentRequests.addAll(immediateOpsRep); operations.removeAll(immediateOpsRep); @@ -351,7 +336,7 @@ public void flush() { if (listener != null) { // synchronous execution to make sure it actually runs before - listener.beforeBulk(id, request, requestContexts); + listener.beforeBulk(id, request, contexts); } CompletionStage result = client.bulk(request); @@ -362,13 +347,14 @@ public void flush() { request = null; } - return new RequestExecution<>(id, request, requestContexts, result); + return new RequestExecution<>(id, request, contexts, result); }); if (exec != null) { // A request was actually sent exec.futureResponse.handle((resp, thr) -> { if (resp != null) { + // Success? Checking if total or partial List failedRequestsCanRetry = resp.items().stream() .filter(i -> i.error() != null && i.status() == 429) @@ -391,23 +377,73 @@ public void flush() { } } else { // Partial success, retrying failed requests if policy allows it - // Getting original requests + // Keeping list of retryables, to exclude them for calling listener later + List> retryableReq = new ArrayList<>(); + List retryableResp = new ArrayList<>(); for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) { int index = resp.items().indexOf(bulkItemResponse); + // Getting original failed, requests and keeping successful ones to send to the + // listener BulkOperationRepeatable original = sentRequests.get(index); if (original.canRetry()) { + retryableResp.add(bulkItemResponse); Iterator retries = Optional.ofNullable(original.getRetries()).orElse(backoffPolicy.iterator()); - addRetry(new BulkOperationRepeatable<>(original.getOperation(), - original.getContext(), retries)); + BulkOperationRepeatable refire = + new BulkOperationRepeatable<>(original.getOperation(), + original.getContext(), retries); + retryableReq.add(refire); + addRetry(refire); + logger.warn("Added failed request back in queue, retrying in : " + refire.getCurrentRetryTimeDelay() + " ms"); // TODO remove after checking assert (bulkItemResponse.operationType().toString().equals(sentRequests.get(index).getOperation()._kind().toString())); + } else { + logger.warn("Retries finished for request: " + original.getOperation()._kind().toString()); } - else{ - System.out.println("Retries finished"); - // TODO should print some message? + } + // Scheduling flushes for just sent out retryable requests + if (!retryableReq.isEmpty()) { + // if size <= 3, all times + // if size > 3, schedule just first, last and median + scheduleRetries(retryableReq); + } + // Retrieving list of remaining successful or not retryable requests + sentRequests.removeAll(retryableReq); + if (!sentRequests.isEmpty()) { + if (listener != null) { + // Creating partial BulkRequest + BulkRequest partialRequest = newRequest() + .operations(sentRequests.stream() + .map(BulkOperationRepeatable::getOperation) + .collect(Collectors.toList())) + .build(); + // Getting contexts + List partialCtx = sentRequests.stream() + .map(BulkOperationRepeatable::getContext) + .collect(Collectors.toList()); + // Filtering response + List partialItems = resp.items(); + partialItems.removeAll(retryableResp); + + BulkResponse partialResp = BulkResponse.of(br -> br + .items(partialItems) + .errors(resp.errors()) // TODO sure? + .took(resp.took()) + .ingestTook(resp.ingestTook())); + + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, partialRequest, partialCtx, partialResp); + } finally { + if (listenerInProgressCount.decrementAndGet() == 0) { + closeCondition.signalIfReady(); + } + } + }); } } + } } else { // Failure @@ -434,6 +470,27 @@ public void flush() { } } + private void scheduleRetries(List> retryableReq) { + List sortedDelays = retryableReq.stream() + .map(BulkOperationRepeatable::getCurrentRetryTimeDelay) + .distinct() + .sorted() + .collect(Collectors.toList()); + + // scheduling earlier delay, first in list + retryScheduler.schedule(this::flush, sortedDelays.get(0), TimeUnit.MILLISECONDS); + if (sortedDelays.size() == 2) { + // special case, scheduling both delays + retryScheduler.schedule(this::flush, sortedDelays.get(1), TimeUnit.MILLISECONDS); + } else if (sortedDelays.size() > 2) { + // general case, scheduling median and latest delays + retryScheduler.schedule(this::flush, sortedDelays.get(sortedDelays.size() / 2), + TimeUnit.MILLISECONDS); + retryScheduler.schedule(this::flush, sortedDelays.get(sortedDelays.size() - 1), + TimeUnit.MILLISECONDS); + } + } + public void add(BulkOperation operation, Context context) { if (isClosed) { throw new IllegalStateException("Ingester has been closed"); @@ -445,30 +502,25 @@ public void add(BulkOperation operation, Context context) { innerAdd(repeatableOp); } + // Same as "add", but skips the closed check to allow retries to be added even after ingester closure private void addRetry(BulkOperationRepeatable repeatableOp) { - innerAdd(repeatableOp); + // Sending the operation back in the queue using the retry scheduler + retriesInProgressCount.incrementAndGet(); + retryScheduler.submit(() -> { + try { + innerAdd(repeatableOp); + } finally { + if (retriesInProgressCount.decrementAndGet() == 0) { + closeCondition.signalIfReady(); + } + } + }); } private void innerAdd(BulkOperationRepeatable repeatableOp) { IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper()); addCondition.whenReady(() -> { - -// Context context = repeatableOp.getContext(); -// -// if (context != null) { -// // Lazily build the context list -// if (contexts == null) { -// int size = operations.size(); -// if (size == 0) { -// contexts = new ArrayList<>(); -// } else { -// contexts = new ArrayList<>(Collections.nCopies(size, null)); -// } -// } -// contexts.add(context); -// } - operations.add(ingestOp.operation()); currentSize += ingestOp.size(); diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java index 4c050fb3d..62bf08a5f 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java @@ -7,18 +7,19 @@ import java.util.Iterator; import java.util.Optional; -public class BulkOperationRepeatable { +class BulkOperationRepeatable { private final BulkOperation operation; private final Context context; private final Iterator retries; - private Instant retryTime; + private Long retryTime; public BulkOperationRepeatable(BulkOperation request, Context context, Iterator retries) { this.operation = request; this.context = context; this.retries = retries; // if the retries iterator is null it means that it's not a retry, otherwise calculating retry time - this.retryTime = Optional.ofNullable(retries).map(r -> Instant.now().plus(r.next(), ChronoUnit.MILLIS)).orElse(Instant.now()); + long currentMillis = getCurrentMillis(); + this.retryTime = Optional.ofNullable(retries).map(r -> currentMillis + r.next()).orElse(currentMillis); } public BulkOperation getOperation() { @@ -33,15 +34,23 @@ public Iterator getRetries() { return retries; } - public Instant getCurrentRetryTime() { + public Long getCurrentRetryTime() { return this.retryTime; } + public long getCurrentRetryTimeDelay() { + return this.retryTime - getCurrentMillis(); + } + public boolean canRetry() { return Optional.ofNullable(retries).map(Iterator::hasNext).orElse(true); } public boolean isSendable() { - return retryTime.isBefore(Instant.now()); + return (this.retryTime - getCurrentMillis()) <= 0; + } + + private Long getCurrentMillis(){ + return System.nanoTime()/1_000_000L; } } diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java index 2840c052f..e0e8dac49 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -34,7 +34,7 @@ /** * A bulk operation whose size has been calculated and content turned to a binary blob (to compute its size). */ -public class IngesterOperation { +class IngesterOperation { private final BulkOperationRepeatable repeatableOp; private final long size; diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index 4c707b851..5254d7f0e 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -32,6 +32,7 @@ import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.JsonpUtils; import co.elastic.clients.json.SimpleJsonpMapper; +import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Endpoint; import co.elastic.clients.transport.TransportOptions; From 1169ca8f24ade6e45b8eca54d8ef01aa7454cce8 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Wed, 29 Jan 2025 11:07:23 +0100 Subject: [PATCH 05/11] working unit test, bugfixes --- .../_helpers/bulk/BulkIngester.java | 21 +- .../bulk/BulkIngesterRetryPolicyTest.java | 216 ++++++++++++++++++ 2 files changed, 228 insertions(+), 9 deletions(-) create mode 100644 java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index d7057dcda..442bcdeb9 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -38,7 +38,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; @@ -301,7 +300,8 @@ public void flush() { RequestExecution exec = sendRequestCondition.whenReadyIf( () -> { // May happen on manual and periodic flushes - return !operations.isEmpty(); + return !operations.isEmpty() && operations.stream() + .anyMatch(BulkOperationRepeatable::isSendable); }, () -> { // Selecting operations that can be sent immediately @@ -319,9 +319,10 @@ public void flush() { .collect(Collectors.toList()); // If all contexts are null, no need for the list - if (contexts.stream().allMatch(Objects::isNull)) { - contexts = null; - } + // TODO want to keep? +// if (contexts.stream().allMatch(Objects::isNull)) { +// contexts = new ArrayList<>(); +// } // Build the request BulkRequest request = newRequest().operations(immediateOps).build(); @@ -379,6 +380,7 @@ public void flush() { // Partial success, retrying failed requests if policy allows it // Keeping list of retryables, to exclude them for calling listener later List> retryableReq = new ArrayList<>(); + List> refires = new ArrayList<>(); List retryableResp = new ArrayList<>(); for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) { int index = resp.items().indexOf(bulkItemResponse); @@ -392,7 +394,8 @@ public void flush() { BulkOperationRepeatable refire = new BulkOperationRepeatable<>(original.getOperation(), original.getContext(), retries); - retryableReq.add(refire); + retryableReq.add(original); + refires.add(refire); addRetry(refire); logger.warn("Added failed request back in queue, retrying in : " + refire.getCurrentRetryTimeDelay() + " ms"); // TODO remove after checking @@ -402,10 +405,10 @@ public void flush() { } } // Scheduling flushes for just sent out retryable requests - if (!retryableReq.isEmpty()) { + if (!refires.isEmpty()) { // if size <= 3, all times // if size > 3, schedule just first, last and median - scheduleRetries(retryableReq); + scheduleRetries(refires); } // Retrieving list of remaining successful or not retryable requests sentRequests.removeAll(retryableReq); @@ -422,7 +425,7 @@ public void flush() { .map(BulkOperationRepeatable::getContext) .collect(Collectors.toList()); // Filtering response - List partialItems = resp.items(); + List partialItems = new ArrayList<>(resp.items()); partialItems.removeAll(retryableResp); BulkResponse partialResp = BulkResponse.of(br -> br diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java new file mode 100644 index 000000000..fd70dfbd4 --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java @@ -0,0 +1,216 @@ +package co.elastic.clients.elasticsearch._helpers.bulk; + +import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.ErrorCause; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import co.elastic.clients.elasticsearch.core.bulk.CreateOperation; +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import co.elastic.clients.elasticsearch.core.bulk.OperationType; +import co.elastic.clients.json.JsonpMapper; +import co.elastic.clients.json.SimpleJsonpMapper; +import co.elastic.clients.transport.BackoffPolicy; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.Endpoint; +import co.elastic.clients.transport.TransportOptions; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BulkIngesterRetryPolicyTest { + + protected static ElasticsearchClient client; + + private BulkOperation create = BulkOperation.of(b -> b.create(c -> c.index("foo").id("1").document("1"))); + private BulkOperation index = BulkOperation.of(b -> b.index(c -> c.index("fooo").id("2").document("2"))); + private BulkOperation delete = BulkOperation.of(b -> b.delete(c -> c.index("foooo").id("3"))); + + @BeforeAll + public static void beforeAll() { + TestTransport transport = new TestTransport(); + ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport); + } + + @Test + public void retryTestNoScheduledFlushNoContext() throws Exception { + TestTransport transport = new TestTransport(); + ElasticsearchClient client = new ElasticsearchClient(transport); + CountingListener listener = new CountingListener(); + + + BulkIngester ingester = BulkIngester.of(b -> b + .client(client) + .maxOperations(3) + .maxConcurrentRequests(3) + .listener(listener) + .backoffPolicy(BackoffPolicy.constantBackoff(50L,8)) + ); + + // First test, partial success + { + ingester.add(create); + ingester.add(index); + ingester.add(index); + + ingester.close(); + + // at most it should be 1 instant success + 2 retries, at minimum just 3 instant successes + assertTrue(listener.requests.get() > 0 && listener.requests.get() < 4); + // eventually all 3 have to succeed + assertTrue(listener.successOperations.get() == 3); + } + transport.close(); + } + + + private static class TestTransport implements ElasticsearchTransport { + public final AtomicInteger requestsStarted = new AtomicInteger(); + public final AtomicInteger requestsCompleted = new AtomicInteger(); + public final AtomicInteger operations = new AtomicInteger(); + + public final AtomicInteger retryFailures = new AtomicInteger(); + + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + @Override + public ResponseT performRequest( + RequestT request, + Endpoint endpoint, + @Nullable TransportOptions options + ) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture performRequestAsync(RequestT request, Endpoint endpoint, @Nullable TransportOptions options) { + + BulkRequest bulk = (BulkRequest) request; + requestsStarted.incrementAndGet(); + operations.addAndGet(bulk.operations().size()); + + if (bulk.operations().isEmpty()) { + System.out.println("No operations!"); + } + + // For testing purposes, different result depending on the operation type. + // Create will always succeed + // Index will always 429 for 3 times, then 200 + // Delete will always return 404 + + List items = new ArrayList<>(); + for (BulkOperation op : bulk.operations()) { + OperationType operationType = OperationType.Create; + ErrorCause error = null; + int status = 200; + String index = null; + switch (op._kind()) { + case Index: + index = ((IndexOperation) op._get()).index(); + operationType = OperationType.Index; + boolean isStillRetrying = retryFailures.incrementAndGet() > 2; + error = isStillRetrying ? null : ErrorCause.of(e -> e.reason("some error")); + status = isStillRetrying ? 200 : 429; + break; + case Delete: + index = ((DeleteOperation) op._get()).index(); + operationType = OperationType.Delete; + error = ErrorCause.of(e -> e.reason("some error")); + status = 404; + break; + default: + index = ((CreateOperation) op._get()).index(); + break; + } + ErrorCause finalError = error; + int finalStatus = status; + OperationType finalOperationType = operationType; + String finalIndex = index; + items.add(BulkResponseItem.of(b -> b + .index(finalIndex) + .operationType(finalOperationType) + .status(finalStatus) + .error(finalError))); + } + + CompletableFuture response = new CompletableFuture<>(); + executor.submit(() -> { + requestsCompleted.incrementAndGet(); + response.complete(BulkResponse.of(r -> r.errors(false).items(items).took(3))); + }); + + @SuppressWarnings("unchecked") + CompletableFuture result = (CompletableFuture) response; + return result; + } + + @Override + public JsonpMapper jsonpMapper() { + return SimpleJsonpMapper.INSTANCE; + } + + @Override + public TransportOptions options() { + return null; + } + + @Override + public void close() throws IOException { + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static class CountingListener implements BulkListener { + public final AtomicInteger successOperations = new AtomicInteger(); + public final AtomicInteger errorOperations = new AtomicInteger(); + public final AtomicInteger requests = new AtomicInteger(); + + @Override + public void beforeBulk(long executionId, BulkRequest request, List contexts) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, + BulkResponse response) { + for (BulkResponseItem item : response.items()) { + if(item.error() != null) { + errorOperations.incrementAndGet(); + } + else{ + successOperations.incrementAndGet(); + } + } + requests.incrementAndGet(); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { + failure.printStackTrace(); + errorOperations.addAndGet(request.operations().size()); + requests.incrementAndGet(); + } + } +} From 50ced5141b148d81c4920d0b1c861b6c487fb270 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Wed, 29 Jan 2025 12:59:47 +0100 Subject: [PATCH 06/11] more tests --- .../bulk/BulkIngesterRetryPolicyTest.java | 278 ++++++++++++++---- 1 file changed, 216 insertions(+), 62 deletions(-) diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java index fd70dfbd4..051707f55 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java @@ -16,7 +16,10 @@ import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Endpoint; +import co.elastic.clients.transport.TransportException; import co.elastic.clients.transport.TransportOptions; +import co.elastic.clients.transport.http.RepeatableBodyResponse; +import co.elastic.clients.transport.http.TransportHttpClient; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -36,45 +39,159 @@ public class BulkIngesterRetryPolicyTest { protected static ElasticsearchClient client; + protected static TestTransport transport; + protected static CountingListener listener; private BulkOperation create = BulkOperation.of(b -> b.create(c -> c.index("foo").id("1").document("1"))); private BulkOperation index = BulkOperation.of(b -> b.index(c -> c.index("fooo").id("2").document("2"))); + private BulkOperation indexFail = BulkOperation.of(b -> b.index(c -> c.index("fail").id("2").document("2"))); private BulkOperation delete = BulkOperation.of(b -> b.delete(c -> c.index("foooo").id("3"))); + private BulkOperation deleteFail = BulkOperation.of(b -> b.delete(c -> c.index("fail").id("3"))); @BeforeAll - public static void beforeAll() { - TestTransport transport = new TestTransport(); - ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport); + public static void setup(){ + transport = new TestTransport(); + client = new ElasticsearchClient(transport); + listener = new CountingListener(); } @Test public void retryTestNoScheduledFlushNoContext() throws Exception { - TestTransport transport = new TestTransport(); - ElasticsearchClient client = new ElasticsearchClient(transport); - CountingListener listener = new CountingListener(); - - - BulkIngester ingester = BulkIngester.of(b -> b - .client(client) - .maxOperations(3) - .maxConcurrentRequests(3) - .listener(listener) - .backoffPolicy(BackoffPolicy.constantBackoff(50L,8)) - ); - // First test, partial success + // First test, partial success, other will succeed after retrying { + BulkIngester ingester = newBasicBulkIngester(listener); + ingester.add(create); ingester.add(index); ingester.add(index); ingester.close(); - // at most it should be 1 instant success + 2 retries, at minimum just 3 instant successes - assertTrue(listener.requests.get() > 0 && listener.requests.get() < 4); + // 1 instant success, 2 retried, but succeeded. can be either 2 or 3 depending if the retries + // get scheduled at the same exact time + assertTrue(listener.requests.get() == 2 || listener.requests.get() == 3); // eventually all 3 have to succeed assertTrue(listener.successOperations.get() == 3); } + + // Second test, all requests will succeed after retrying + { + transport.reset(); + listener.reset(); + + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(index); + ingester.add(index); + ingester.add(index); + ingester.add(index); + + ingester.close(); + + // between 1 and 4, depending on scheduler + assertTrue(listener.requests.get() >= 1 && listener.requests.get() <= 4); + // eventually all 4 have to succeed + assertTrue(listener.successOperations.get() == 4); + } + + // Third test, only one retryable (will succeed), other permanent failures + { + transport.reset(); + listener.reset(); + + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(index); + ingester.add(delete); + ingester.add(delete); + + ingester.close(); + + // 2 failed will be handled together, then 1 retry + assertTrue(listener.requests.get() == 2); + + assertTrue(listener.successOperations.get() == 1); + assertTrue(listener.errorOperations.get() == 2); + } + + // Fourth test, all requests will be retried until policy allows, then fail + { + transport.reset(); + listener.reset(); + + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(indexFail); + ingester.add(indexFail); + ingester.add(indexFail); + + ingester.close(); + + // from 1 to 3 depending on scheduling + assertTrue(listener.requests.get() >= 1 && listener.requests.get() <= 3); + + assertTrue(listener.successOperations.get() == 0); + assertTrue(listener.errorOperations.get() == 3); + } + + // Fifth test, one exception that will make everything else fail, no retries triggered + { + transport.reset(); + listener.reset(); + + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(index); + ingester.add(create); + ingester.add(deleteFail); + + ingester.close(); + + // just the one + assertTrue(listener.requests.get() == 1); + + assertTrue(listener.successOperations.get() == 0); + assertTrue(listener.errorOperations.get() == 3); + } + + // Sixth test, a mix of everything + { + transport.reset(); + listener.reset(); + + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(create); + ingester.add(index); + ingester.add(indexFail); + ingester.add(delete); + ingester.add(create); + ingester.add(index); + ingester.add(indexFail); + ingester.add(delete); + + ingester.close(); + + // from 2 to 4 depending on scheduling + assertTrue(listener.requests.get() >= 1 && listener.successOperations.get() <= 4); + + assertTrue(listener.successOperations.get() == 4); + assertTrue(listener.errorOperations.get() == 4); + } + + transport.close(); + } + + @Test + public void retryTestFlushAndContextExponentialBackoff() throws Exception { + + TestTransport transport = new TestTransport(); + ElasticsearchClient client = new ElasticsearchClient(transport); + CountingListener listener = new CountingListener(); + + // TODO + transport.close(); } @@ -112,53 +229,67 @@ public CompletableFuture performRequest // For testing purposes, different result depending on the operation type. // Create will always succeed - // Index will always 429 for 3 times, then 200 - // Delete will always return 404 - - List items = new ArrayList<>(); - for (BulkOperation op : bulk.operations()) { - OperationType operationType = OperationType.Create; - ErrorCause error = null; - int status = 200; - String index = null; - switch (op._kind()) { - case Index: - index = ((IndexOperation) op._get()).index(); - operationType = OperationType.Index; - boolean isStillRetrying = retryFailures.incrementAndGet() > 2; - error = isStillRetrying ? null : ErrorCause.of(e -> e.reason("some error")); - status = isStillRetrying ? 200 : 429; - break; - case Delete: - index = ((DeleteOperation) op._get()).index(); - operationType = OperationType.Delete; - error = ErrorCause.of(e -> e.reason("some error")); - status = 404; - break; - default: - index = ((CreateOperation) op._get()).index(); - break; + // Index will always 429 for 3 times, then 200. Index with index name "fail" will only 429. + // Delete will always return 404. Delete with index name "fail" will throw transport exception. + + try { + + List items = new ArrayList<>(); + for (BulkOperation op : bulk.operations()) { + OperationType operationType = OperationType.Create; + ErrorCause error = null; + int status = 200; + String index = null; + switch (op._kind()) { + case Index: + index = ((IndexOperation) op._get()).index(); + operationType = OperationType.Index; + boolean isStillRetrying = retryFailures.incrementAndGet() > 2; + error = isStillRetrying && !index.equals("fail") ? null : ErrorCause.of(e -> e.reason("some error")); + status = isStillRetrying && !index.equals("fail") ? 200 : 429; + break; + case Delete: + index = ((DeleteOperation) op._get()).index(); + if (index.equals("fail")) { + throw new RuntimeException("error"); + } + operationType = OperationType.Delete; + error = ErrorCause.of(e -> e.reason("some error")); + status = 404; + break; + default: + index = ((CreateOperation) op._get()).index(); + break; + } + ErrorCause finalError = error; + int finalStatus = status; + OperationType finalOperationType = operationType; + String finalIndex = index; + items.add(BulkResponseItem.of(b -> b + .index(finalIndex) + .operationType(finalOperationType) + .status(finalStatus) + .error(finalError))); } - ErrorCause finalError = error; - int finalStatus = status; - OperationType finalOperationType = operationType; - String finalIndex = index; - items.add(BulkResponseItem.of(b -> b - .index(finalIndex) - .operationType(finalOperationType) - .status(finalStatus) - .error(finalError))); - } - CompletableFuture response = new CompletableFuture<>(); - executor.submit(() -> { - requestsCompleted.incrementAndGet(); - response.complete(BulkResponse.of(r -> r.errors(false).items(items).took(3))); - }); + CompletableFuture response = new CompletableFuture<>(); + executor.submit(() -> { + requestsCompleted.incrementAndGet(); + response.complete(BulkResponse.of(r -> r.errors(false).items(items).took(3))); + }); - @SuppressWarnings("unchecked") - CompletableFuture result = (CompletableFuture) response; - return result; + @SuppressWarnings("unchecked") + CompletableFuture result = (CompletableFuture) response; + return result; + } + catch (RuntimeException e){ + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(e); + executor.submit(() -> { + future.completeExceptionally(e); + }); + return future; + } } @Override @@ -180,6 +311,13 @@ public void close() throws IOException { throw new RuntimeException(e); } } + + public void reset() { + requestsStarted.set(0); + requestsCompleted.set(0); + operations.set(0); + retryFailures.set(0); + } } private static class CountingListener implements BulkListener { @@ -212,5 +350,21 @@ public void afterBulk(long executionId, BulkRequest request, List contexts errorOperations.addAndGet(request.operations().size()); requests.incrementAndGet(); } + + public void reset() { + successOperations.set(0); + errorOperations.set(0); + requests.set(0); + } + } + + private BulkIngester newBasicBulkIngester(BulkListener listener){ + return BulkIngester.of(b -> b + .client(client) + .maxOperations(10) + .maxConcurrentRequests(10) + .listener(listener) + .backoffPolicy(BackoffPolicy.constantBackoff(50L,8)) + ); } } From 365c04ec78c86e61ef113536290ed0ab0d48a593 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Thu, 30 Jan 2025 17:25:52 +0100 Subject: [PATCH 07/11] unit tests done --- .../bulk/BulkIngesterRetryPolicyTest.java | 186 ++++++++++++++---- 1 file changed, 149 insertions(+), 37 deletions(-) diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java index 051707f55..55d5c5e0f 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java @@ -1,6 +1,5 @@ package co.elastic.clients.elasticsearch._helpers.bulk; -import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.ErrorCause; import co.elastic.clients.elasticsearch.core.BulkRequest; @@ -16,21 +15,22 @@ import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Endpoint; -import co.elastic.clients.transport.TransportException; import co.elastic.clients.transport.TransportOptions; -import co.elastic.clients.transport.http.RepeatableBodyResponse; -import co.elastic.clients.transport.http.TransportHttpClient; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -44,12 +44,13 @@ public class BulkIngesterRetryPolicyTest { private BulkOperation create = BulkOperation.of(b -> b.create(c -> c.index("foo").id("1").document("1"))); private BulkOperation index = BulkOperation.of(b -> b.index(c -> c.index("fooo").id("2").document("2"))); - private BulkOperation indexFail = BulkOperation.of(b -> b.index(c -> c.index("fail").id("2").document("2"))); + private BulkOperation indexFail = BulkOperation.of(b -> b.index(c -> c.index("fail").id("2").document( + "2"))); private BulkOperation delete = BulkOperation.of(b -> b.delete(c -> c.index("foooo").id("3"))); private BulkOperation deleteFail = BulkOperation.of(b -> b.delete(c -> c.index("fail").id("3"))); - @BeforeAll - public static void setup(){ + @BeforeEach + public void setup() { transport = new TestTransport(); client = new ElasticsearchClient(transport); listener = new CountingListener(); @@ -60,7 +61,7 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { // First test, partial success, other will succeed after retrying { - BulkIngester ingester = newBasicBulkIngester(listener); + BulkIngester ingester = newBasicBulkIngester(listener); ingester.add(create); ingester.add(index); @@ -68,11 +69,15 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { ingester.close(); - // 1 instant success, 2 retried, but succeeded. can be either 2 or 3 depending if the retries - // get scheduled at the same exact time + // 1 instant success, 2 retried, but succeeded. can be either 2 or 3 depending on the retries, + // if they get scheduled at the same exact time assertTrue(listener.requests.get() == 2 || listener.requests.get() == 3); // eventually all 3 have to succeed assertTrue(listener.successOperations.get() == 3); + + // 1 for the create and first try for the indexes, 2 + 2 for both index retries, + // which could be scheduled at the same time, so from 3 to 5 + assertTrue(listener.sentRequestsTotal.get() >= 3 && listener.sentRequestsTotal.get() <= 5); } // Second test, all requests will succeed after retrying @@ -80,7 +85,7 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { transport.reset(); listener.reset(); - BulkIngester ingester = newBasicBulkIngester(listener); + BulkIngester ingester = newBasicBulkIngester(listener); ingester.add(index); ingester.add(index); @@ -93,6 +98,8 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { assertTrue(listener.requests.get() >= 1 && listener.requests.get() <= 4); // eventually all 4 have to succeed assertTrue(listener.successOperations.get() == 4); + // between 3 and 9, depending on scheduler + assertTrue(listener.sentRequestsTotal.get() >= 3 && listener.sentRequestsTotal.get() <= 9); } // Third test, only one retryable (will succeed), other permanent failures @@ -100,7 +107,7 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { transport.reset(); listener.reset(); - BulkIngester ingester = newBasicBulkIngester(listener); + BulkIngester ingester = newBasicBulkIngester(listener); ingester.add(index); ingester.add(delete); @@ -113,6 +120,8 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { assertTrue(listener.successOperations.get() == 1); assertTrue(listener.errorOperations.get() == 2); + // 1 initial + 2 retries + assertTrue(listener.sentRequestsTotal.get() == 3); } // Fourth test, all requests will be retried until policy allows, then fail @@ -120,7 +129,7 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { transport.reset(); listener.reset(); - BulkIngester ingester = newBasicBulkIngester(listener); + BulkIngester ingester = newBasicBulkIngester(listener); ingester.add(indexFail); ingester.add(indexFail); @@ -133,6 +142,8 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { assertTrue(listener.successOperations.get() == 0); assertTrue(listener.errorOperations.get() == 3); + // between 8 and 24, depending on scheduler + assertTrue(listener.sentRequestsTotal.get() >= 8 && listener.sentRequestsTotal.get() <= 24); } // Fifth test, one exception that will make everything else fail, no retries triggered @@ -140,7 +151,7 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { transport.reset(); listener.reset(); - BulkIngester ingester = newBasicBulkIngester(listener); + BulkIngester ingester = newBasicBulkIngester(listener); ingester.add(index); ingester.add(create); @@ -153,6 +164,9 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { assertTrue(listener.successOperations.get() == 0); assertTrue(listener.errorOperations.get() == 3); + + // just the one + assertTrue(listener.sentRequestsTotal.get() == 1); } // Sixth test, a mix of everything @@ -160,7 +174,7 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { transport.reset(); listener.reset(); - BulkIngester ingester = newBasicBulkIngester(listener); + BulkIngester ingester = newBasicBulkIngester(listener); ingester.add(create); ingester.add(index); @@ -178,6 +192,9 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { assertTrue(listener.successOperations.get() == 4); assertTrue(listener.errorOperations.get() == 4); + + // between 8 and 18, depending on scheduler + assertTrue(listener.sentRequestsTotal.get() >= 8 && listener.sentRequestsTotal.get() <= 18); } transport.close(); @@ -186,11 +203,84 @@ public void retryTestNoScheduledFlushNoContext() throws Exception { @Test public void retryTestFlushAndContextExponentialBackoff() throws Exception { - TestTransport transport = new TestTransport(); - ElasticsearchClient client = new ElasticsearchClient(transport); - CountingListener listener = new CountingListener(); + // One success, other will succeed after retrying, other will fail eventually + { + BulkIngester ingester = newBulkIngesterWithFlushAndContext(listener); - // TODO + ingester.add(create, 1); + ingester.add(indexFail, 2); + ingester.add(index, 3); + + ingester.close(); + + // should be 3 separate requests sent, one instant, one after a few retries, the last one error. + assertTrue(listener.requests.get() == 3); + // 2 will succeed, one will fail + assertTrue(listener.successOperations.get() == 2); + assertTrue(listener.errorOperations.get() == 1); + // between 8 and 10, depending on scheduler (first one + 2 retries for index + 8 retries for + // indexfail) + assertTrue(listener.sentRequestsTotal.get() >= 8 && listener.sentRequestsTotal.get() <= 11); + // checking order of contexts after send confirmed + Iterator iter = listener.sentContexts.iterator(); + // first one being completed is create + assertTrue(iter.next().equals(1)); + // second one is index, which will take only 2 retries + assertTrue(iter.next().equals(3)); + // last one is indexFail, taking 8 retries to fail + assertTrue(iter.next().equals(2)); + } + + transport.close(); + } + + @Test + public void multiThreadStressTest() throws InterruptedException, IOException { + + // DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme + // situation where the number of adding threads greatly exceeds the number of concurrent requests + // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests + // accordingly. + BulkIngester ingester = BulkIngester.of(b -> b + .client(client) + .listener(listener) + .flushInterval(5, TimeUnit.SECONDS) + .backoffPolicy(BackoffPolicy.constantBackoff(50L, 8))); + + ExecutorService executor = Executors.newFixedThreadPool(50); + + // sends create operations, but once every 1000, one index operation will be sent, + // and once every 5000 an indexFail + for (int i = 0; i < 100000; i++) { + int ii = i; + Runnable thread = () -> { + int finalI = ii; + if (ii % 1000 == 0) { + ingester.add(index, ii); + } else if (ii % 5000 == 0) { + ingester.add(indexFail, ii); + } else { + ingester.add(create, ii); + } + }; + executor.submit(thread); + } + + executor.awaitTermination(10, TimeUnit.SECONDS); + ingester.close(); + + // all operations will succeed eventually, so the total has to be 100000 + assertTrue(listener.successOperations.get() == 100000); + assertTrue(listener.sentContexts.size() == 100000); + assertTrue(listener.errorOperations.get() == 0); + // it's difficult to predict how many requests will be sent, but considering they will be sent + // in batches of 1000, without retries it should be exactly 100, considering that 100 out of + // 100000 will be retried 3 times and 20 will be retried 8 times, if they don't get batched together + // with the others it could result in a total of 560, which is highly unlikely. + // more reasonably there will be between 100 and 300 requests sent. + assertTrue(listener.sentRequestsTotal.get() >= 100 && listener.sentRequestsTotal.get() <= 300); + // same reasoning + assertTrue(listener.requests.get() >= 100 && listener.requests.get() <= 300); transport.close(); } @@ -201,7 +291,7 @@ private static class TestTransport implements ElasticsearchTransport { public final AtomicInteger requestsCompleted = new AtomicInteger(); public final AtomicInteger operations = new AtomicInteger(); - public final AtomicInteger retryFailures = new AtomicInteger(); + public ConcurrentHashMap retryFailures = new ConcurrentHashMap<>(); private final ExecutorService executor = Executors.newCachedThreadPool(); @@ -229,7 +319,8 @@ public CompletableFuture performRequest // For testing purposes, different result depending on the operation type. // Create will always succeed - // Index will always 429 for 3 times, then 200. Index with index name "fail" will only 429. + // Index will always return 429 for 2 times, then 200. Index with index name "fail" will only + // return 429. // Delete will always return 404. Delete with index name "fail" will throw transport exception. try { @@ -244,8 +335,10 @@ public CompletableFuture performRequest case Index: index = ((IndexOperation) op._get()).index(); operationType = OperationType.Index; - boolean isStillRetrying = retryFailures.incrementAndGet() > 2; - error = isStillRetrying && !index.equals("fail") ? null : ErrorCause.of(e -> e.reason("some error")); + retryFailures.putIfAbsent(op, 0); + boolean isStillRetrying = retryFailures.computeIfPresent(op, (k, v) -> v + 1) > 2; + error = isStillRetrying && !index.equals("fail") ? null : + ErrorCause.of(e -> e.reason("some error")); status = isStillRetrying && !index.equals("fail") ? 200 : 429; break; case Delete: @@ -281,8 +374,7 @@ public CompletableFuture performRequest @SuppressWarnings("unchecked") CompletableFuture result = (CompletableFuture) response; return result; - } - catch (RuntimeException e){ + } catch (RuntimeException e) { CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(e); executor.submit(() -> { @@ -316,38 +408,46 @@ public void reset() { requestsStarted.set(0); requestsCompleted.set(0); operations.set(0); - retryFailures.set(0); + retryFailures = new ConcurrentHashMap<>(); } } - private static class CountingListener implements BulkListener { + private static class CountingListener implements BulkListener { + public final AtomicInteger sentRequestsTotal = new AtomicInteger(); public final AtomicInteger successOperations = new AtomicInteger(); public final AtomicInteger errorOperations = new AtomicInteger(); public final AtomicInteger requests = new AtomicInteger(); + public final ConcurrentLinkedQueue sentContexts = new ConcurrentLinkedQueue<>(); @Override - public void beforeBulk(long executionId, BulkRequest request, List contexts) { - + public void beforeBulk(long executionId, BulkRequest request, List contexts) { + sentRequestsTotal.incrementAndGet(); } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, + public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { for (BulkResponseItem item : response.items()) { - if(item.error() != null) { + if (item.error() != null) { errorOperations.incrementAndGet(); - } - else{ + } else { successOperations.incrementAndGet(); } } + if (contexts.stream().anyMatch(Objects::nonNull)) { + sentContexts.addAll(contexts); + } requests.incrementAndGet(); } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { + public void afterBulk(long executionId, BulkRequest request, List contexts, + Throwable failure) { failure.printStackTrace(); errorOperations.addAndGet(request.operations().size()); + if (contexts.stream().anyMatch(Objects::nonNull)) { + sentContexts.addAll(contexts); + } requests.incrementAndGet(); } @@ -355,16 +455,28 @@ public void reset() { successOperations.set(0); errorOperations.set(0); requests.set(0); + sentRequestsTotal.set(0); } } - private BulkIngester newBasicBulkIngester(BulkListener listener){ + private BulkIngester newBasicBulkIngester(BulkListener listener) { + return BulkIngester.of(b -> b + .client(client) + .maxOperations(10) + .maxConcurrentRequests(10) + .listener(listener) + .backoffPolicy(BackoffPolicy.constantBackoff(50L, 8)) + ); + } + + private BulkIngester newBulkIngesterWithFlushAndContext(BulkListener listener) { return BulkIngester.of(b -> b .client(client) .maxOperations(10) .maxConcurrentRequests(10) .listener(listener) - .backoffPolicy(BackoffPolicy.constantBackoff(50L,8)) + .flushInterval(1000, TimeUnit.MILLISECONDS) + .backoffPolicy(BackoffPolicy.constantBackoff(50L, 8)) ); } } From 69842eef31575e0bd9f05faf257d0c95a8ffeb45 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Thu, 30 Jan 2025 17:53:24 +0100 Subject: [PATCH 08/11] refactoring --- .../_helpers/bulk/BulkIngester.java | 111 ++++++++++-------- 1 file changed, 63 insertions(+), 48 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 442bcdeb9..d550a6ac8 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -363,51 +363,22 @@ public void flush() { if (failedRequestsCanRetry.isEmpty() || backoffPolicy.equals(BackoffPolicy.noBackoff())) { // Total success! ...or there's no retry policy implemented. Either way, can call - // listener after bulk - if (listener != null) { - listenerInProgressCount.incrementAndGet(); - scheduler.submit(() -> { - try { - listener.afterBulk(exec.id, exec.request, exec.contexts, resp); - } finally { - if (listenerInProgressCount.decrementAndGet() == 0) { - closeCondition.signalIfReady(); - } - } - }); - } + listenerAfterBulkSuccess(resp, exec); } else { // Partial success, retrying failed requests if policy allows it - // Keeping list of retryables, to exclude them for calling listener later + // Keeping list of retryable requests/responses, to exclude them for calling + // listener later List> retryableReq = new ArrayList<>(); List> refires = new ArrayList<>(); List retryableResp = new ArrayList<>(); + for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) { int index = resp.items().indexOf(bulkItemResponse); - // Getting original failed, requests and keeping successful ones to send to the - // listener - BulkOperationRepeatable original = sentRequests.get(index); - if (original.canRetry()) { - retryableResp.add(bulkItemResponse); - Iterator retries = - Optional.ofNullable(original.getRetries()).orElse(backoffPolicy.iterator()); - BulkOperationRepeatable refire = - new BulkOperationRepeatable<>(original.getOperation(), - original.getContext(), retries); - retryableReq.add(original); - refires.add(refire); - addRetry(refire); - logger.warn("Added failed request back in queue, retrying in : " + refire.getCurrentRetryTimeDelay() + " ms"); - // TODO remove after checking - assert (bulkItemResponse.operationType().toString().equals(sentRequests.get(index).getOperation()._kind().toString())); - } else { - logger.warn("Retries finished for request: " + original.getOperation()._kind().toString()); - } + selectingRetries(index, bulkItemResponse, sentRequests, retryableResp, + retryableReq, refires); } // Scheduling flushes for just sent out retryable requests if (!refires.isEmpty()) { - // if size <= 3, all times - // if size > 3, schedule just first, last and median scheduleRetries(refires); } // Retrieving list of remaining successful or not retryable requests @@ -450,18 +421,7 @@ public void flush() { } } else { // Failure - if (listener != null) { - listenerInProgressCount.incrementAndGet(); - scheduler.submit(() -> { - try { - listener.afterBulk(exec.id, exec.request, exec.contexts, thr); - } finally { - if (listenerInProgressCount.decrementAndGet() == 0) { - closeCondition.signalIfReady(); - } - } - }); - } + listenerAfterBulkException(thr, exec); } sendRequestCondition.signalIfReadyAfter(() -> { @@ -473,6 +433,58 @@ public void flush() { } } + private void selectingRetries(int index, BulkResponseItem bulkItemResponse, + List> sentRequests, + List retryableResp, + List> retryableReq, + List> refires) { + + // Getting original failed, requests and keeping successful ones to send to the listener + BulkOperationRepeatable original = sentRequests.get(index); + if (original.canRetry()) { + retryableResp.add(bulkItemResponse); + Iterator retries = + Optional.ofNullable(original.getRetries()).orElse(backoffPolicy.iterator()); + BulkOperationRepeatable refire = new BulkOperationRepeatable<>(original.getOperation(), original.getContext(), retries); + retryableReq.add(original); + refires.add(refire); + addRetry(refire); + logger.warn("Added failed request back in queue, retrying in : " + refire.getCurrentRetryTimeDelay() + " ms"); + } else { + logger.warn("Retries finished for request: " + original.getOperation()._kind().toString()); + } + } + + private void listenerAfterBulkException(Throwable thr, RequestExecution exec) { + if (listener != null) { + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, exec.request, exec.contexts, thr); + } finally { + if (listenerInProgressCount.decrementAndGet() == 0) { + closeCondition.signalIfReady(); + } + } + }); + } + } + + private void listenerAfterBulkSuccess(BulkResponse resp, RequestExecution exec) { + if (listener != null) { + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, exec.request, exec.contexts, resp); + } finally { + if (listenerInProgressCount.decrementAndGet() == 0) { + closeCondition.signalIfReady(); + } + } + }); + } + } + private void scheduleRetries(List> retryableReq) { List sortedDelays = retryableReq.stream() .map(BulkOperationRepeatable::getCurrentRetryTimeDelay) @@ -696,7 +708,10 @@ public Builder listener(BulkListener listener) { return this; } - + /** + * Sets the backoff policy that will handle retries for error 429: too many requests. + * All the times are defined in milliseconds. + */ public Builder backoffPolicy(BackoffPolicy backoffPolicy) { this.backoffPolicy = backoffPolicy; return this; From 4b81a79cdace413e6dfde467e867ce898bcdce18 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Thu, 30 Jan 2025 17:58:27 +0100 Subject: [PATCH 09/11] more refactoring --- .../_helpers/bulk/BulkIngester.java | 2 +- .../_helpers/bulk/IngesterOperation.java | 2 +- .../_helpers/bulk/BulkIngesterTest.java | 46 +++++++------------ 3 files changed, 19 insertions(+), 31 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index d550a6ac8..2bfd3c9c4 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -536,7 +536,7 @@ private void innerAdd(BulkOperationRepeatable repeatableOp) { IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper()); addCondition.whenReady(() -> { - operations.add(ingestOp.operation()); + operations.add(ingestOp.repeatableOperation()); currentSize += ingestOp.size(); if (!canAddOperation()) { diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java index e0e8dac49..d9af9c9cf 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -58,7 +58,7 @@ public static IngesterOperation of(BulkOperationRepeatable repeatableOp, JsonpMa } } - public BulkOperationRepeatable operation() { + public BulkOperationRepeatable repeatableOperation() { return this.repeatableOp; } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index 5254d7f0e..d6a086ede 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -32,7 +32,6 @@ import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.JsonpUtils; import co.elastic.clients.json.SimpleJsonpMapper; -import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Endpoint; import co.elastic.clients.transport.TransportOptions; @@ -142,7 +141,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, for (int i = 0; i < numThreads; i++) { new Thread(() -> { try { - Thread.sleep((long) (Math.random() * 100)); + Thread.sleep((long)(Math.random() * 100)); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -169,8 +168,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, assertEquals(expectedOperations, listener.operations.get()); assertEquals(expectedOperations, transport.operations.get()); - int expectedRequests = - expectedOperations / maxOperations + ((expectedOperations % maxOperations == 0) ? 0 : 1); + int expectedRequests = expectedOperations / maxOperations + ((expectedOperations % maxOperations == 0) ? 0 : 1) ; assertEquals(expectedRequests, ingester.requestCount()); assertEquals(expectedRequests, listener.requests.get()); @@ -185,8 +183,7 @@ public void multiThreadStressTest() throws InterruptedException, IOException { // DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme // situation where the number of adding threads greatly exceeds the number of concurrent requests - // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests - // accordingly. + // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests accordingly. BulkIngester ingester = BulkIngester.of(b -> b .client(client) .globalSettings(s -> s.index(index)) @@ -212,22 +209,21 @@ public void multiThreadStressTest() throws InterruptedException, IOException { executor.submit(thread); } - executor.awaitTermination(10, TimeUnit.SECONDS); + executor.awaitTermination(10,TimeUnit.SECONDS); ingester.close(); client.indices().refresh(); IndicesStatsResponse indexStats = client.indices().stats(g -> g.index(index)); - assertTrue(indexStats.indices().get(index).primaries().docs().count() == 100000); + assertTrue(indexStats.indices().get(index).primaries().docs().count()==100000); } @Test public void sizeLimitTest() throws Exception { TestTransport transport = new TestTransport(); - long operationSize = IngesterOperation.of(new BulkOperationRepeatable<>(operation, null, null), - transport.jsonpMapper()).size(); + long operationSize = IngesterOperation.of(new BulkOperationRepeatable<>(operation, null, null), transport.jsonpMapper()).size(); BulkIngester ingester = BulkIngester.of(b -> b .client(new ElasticsearchAsyncClient(transport)) @@ -257,7 +253,7 @@ public void periodicFlushTest() throws Exception { // Disable other flushing limits .maxSize(-1) .maxOperations(-1) - .maxConcurrentRequests(Integer.MAX_VALUE - 1) + .maxConcurrentRequests(Integer.MAX_VALUE-1) ); // Add an operation every 100 ms to give time @@ -297,8 +293,7 @@ public void beforeBulk(long executionId, BulkRequest request, List context } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, - BulkResponse response) { + public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { if (executionId == 2) { // Fail after the request is sent failureCount.incrementAndGet(); @@ -307,8 +302,7 @@ public void afterBulk(long executionId, BulkRequest request, List contexts } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, - Throwable failure) { + public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { } }; @@ -361,13 +355,11 @@ public void beforeBulk(long executionId, BulkRequest request, List cont } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, - BulkResponse response) { + public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, - Throwable failure) { + public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { } }; @@ -381,7 +373,7 @@ public void afterBulk(long executionId, BulkRequest request, List conte for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { // Set a context only after 5, so that we test filling with nulls. - Integer context = j < 5 ? null : i * 10 + j; + Integer context = j < 5 ? null : i*10 + j; ingester.add(operation, context); } } @@ -399,7 +391,7 @@ public void afterBulk(long executionId, BulkRequest request, List conte if (j < 5) { assertNull(contexts.get(j)); } else { - assertEquals(contexts.get(j), i * 10 + j); + assertEquals(contexts.get(j), i*10 + j); } } } @@ -441,8 +433,7 @@ public void beforeBulk(long executionId, BulkRequest request, List context @Test public void pipelineTest() { - String json = "{\"create\":{\"_id\":\"some_id\",\"_index\":\"some_idx\",\"pipeline\":\"pipe\"," + - "\"require_alias\":true}}"; + String json = "{\"create\":{\"_id\":\"some_id\",\"_index\":\"some_idx\",\"pipeline\":\"pipe\",\"require_alias\":true}}"; JsonpMapper mapper = new SimpleJsonpMapper(); BulkOperation create = BulkOperation.of(o -> o.create(c -> c @@ -456,8 +447,7 @@ public void pipelineTest() { String createStr = JsonpUtils.toJsonString(create, mapper); assertEquals(json, createStr); - BulkOperation create1 = IngesterOperation.of(new BulkOperationRepeatable<>(create, null, null), - mapper).operation().getOperation(); + BulkOperation create1 = IngesterOperation.of(new BulkOperationRepeatable<>(create, null, null), mapper).repeatableOperation().getOperation(); String create1Str = JsonpUtils.toJsonString(create1, mapper); assertEquals(json, create1Str); @@ -546,15 +536,13 @@ public void testConfigValidation() { private static class CountingListener implements BulkListener { public final AtomicInteger operations = new AtomicInteger(); public final AtomicInteger requests = new AtomicInteger(); - @Override public void beforeBulk(long executionId, BulkRequest request, List contexts) { } @Override - public void afterBulk(long executionId, BulkRequest request, List contexts, - BulkResponse response) { + public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { operations.addAndGet(request.operations().size()); requests.incrementAndGet(); } @@ -607,7 +595,7 @@ public CompletableFuture performRequest }); @SuppressWarnings("unchecked") - CompletableFuture result = (CompletableFuture) response; + CompletableFuture result = (CompletableFuture)response; return result; } From 5c4c7059b0934ee33acbc5aaf3bf10a53e8f11ce Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Thu, 30 Jan 2025 18:12:47 +0100 Subject: [PATCH 10/11] checkstyle --- .../bulk/BulkOperationRepeatable.java | 21 ++++++++++++++++++- .../_helpers/bulk/IngesterOperation.java | 2 +- .../bulk/BulkIngesterRetryPolicyTest.java | 20 +++++++++++++++++- .../_helpers/bulk/BulkIngesterTest.java | 3 ++- 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java index 62bf08a5f..c2ae4d3ee 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java @@ -1,3 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package co.elastic.clients.elasticsearch._helpers.bulk; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; @@ -13,7 +32,7 @@ class BulkOperationRepeatable { private final Iterator retries; private Long retryTime; - public BulkOperationRepeatable(BulkOperation request, Context context, Iterator retries) { + BulkOperationRepeatable(BulkOperation request, Context context, Iterator retries) { this.operation = request; this.context = context; this.retries = retries; diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java index d9af9c9cf..ac93f08a0 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -38,7 +38,7 @@ class IngesterOperation { private final BulkOperationRepeatable repeatableOp; private final long size; - public IngesterOperation(BulkOperationRepeatable repeatableOp, long size) { + IngesterOperation(BulkOperationRepeatable repeatableOp, long size) { this.repeatableOp = repeatableOp; this.size = size; } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java index 55d5c5e0f..0c14cb20c 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java @@ -1,3 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package co.elastic.clients.elasticsearch._helpers.bulk; import co.elastic.clients.elasticsearch.ElasticsearchClient; @@ -17,7 +36,6 @@ import co.elastic.clients.transport.Endpoint; import co.elastic.clients.transport.TransportOptions; import org.jetbrains.annotations.Nullable; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index d6a086ede..b8afb6325 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -447,7 +447,8 @@ public void pipelineTest() { String createStr = JsonpUtils.toJsonString(create, mapper); assertEquals(json, createStr); - BulkOperation create1 = IngesterOperation.of(new BulkOperationRepeatable<>(create, null, null), mapper).repeatableOperation().getOperation(); + BulkOperation create1 = IngesterOperation.of(new BulkOperationRepeatable<>(create, null, null), mapper) + .repeatableOperation().getOperation(); String create1Str = JsonpUtils.toJsonString(create1, mapper); assertEquals(json, create1Str); From 048386ced074555780608f2e68f212e6d244ddf5 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Fri, 21 Feb 2025 14:12:23 +0100 Subject: [PATCH 11/11] refactoring and optimizations --- .../_helpers/bulk/BulkIngester.java | 126 ++++++++---------- .../_helpers/bulk/IngesterOperation.java | 46 +++---- ...table.java => RetryableBulkOperation.java} | 28 ++-- .../bulk/BulkIngesterRetryPolicyTest.java | 52 +++++++- .../_helpers/bulk/BulkIngesterTest.java | 6 +- 5 files changed, 140 insertions(+), 118 deletions(-) rename java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/{BulkOperationRepeatable.java => RetryableBulkOperation.java} (72%) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 2bfd3c9c4..68f4232da 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.LongSummaryStatistics; import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; @@ -72,8 +73,7 @@ public class BulkIngester implements AutoCloseable { private BackoffPolicy backoffPolicy; // Current state - private List> operations = new ArrayList<>(); - //private List contexts = null; // Created on demand + private List> operations = new ArrayList<>(); private long currentSize; private int requestsInFlightCount; private volatile boolean isClosed = false; @@ -190,7 +190,7 @@ public Duration flushInterval() { * The number of operations that have been buffered, waiting to be sent. */ public int pendingOperations() { - List> operations = this.operations; + List> operations = this.operations; return operations == null ? 0 : operations.size(); } @@ -296,40 +296,34 @@ private void failsafeFlush() { } public void flush() { - List> sentRequests = new ArrayList<>(); + List> sentRequests = new ArrayList<>(); RequestExecution exec = sendRequestCondition.whenReadyIf( () -> { // May happen on manual and periodic flushes return !operations.isEmpty() && operations.stream() - .anyMatch(BulkOperationRepeatable::isSendable); + .anyMatch(RetryableBulkOperation::isSendable); }, () -> { - // Selecting operations that can be sent immediately - List> immediateOpsRep = operations.stream() - .filter(BulkOperationRepeatable::isSendable) - .collect(Collectors.toList()); - + // Selecting operations that can be sent immediately, // Dividing actual operations from contexts - List immediateOps = immediateOpsRep.stream() - .map(BulkOperationRepeatable::getOperation) - .collect(Collectors.toList()); + List immediateOps = new ArrayList<>(); + List contexts = new ArrayList<>(); - List contexts = immediateOpsRep.stream() - .map(BulkOperationRepeatable::getContext) - .collect(Collectors.toList()); + for(Iterator> it = operations.iterator(); it.hasNext();){ + RetryableBulkOperation op = it.next(); + if (op.isSendable()) { + immediateOps.add(op.operation()); + contexts.add(op.context()); - // If all contexts are null, no need for the list - // TODO want to keep? -// if (contexts.stream().allMatch(Objects::isNull)) { -// contexts = new ArrayList<>(); -// } + sentRequests.add(op); + it.remove(); + } + } // Build the request BulkRequest request = newRequest().operations(immediateOps).build(); // Prepare for next round - sentRequests.addAll(immediateOpsRep); - operations.removeAll(immediateOpsRep); currentSize = operations.size(); addCondition.signalIfReady(); @@ -368,40 +362,40 @@ public void flush() { // Partial success, retrying failed requests if policy allows it // Keeping list of retryable requests/responses, to exclude them for calling // listener later - List> retryableReq = new ArrayList<>(); - List> refires = new ArrayList<>(); + List> retryableReq = new ArrayList<>(); + List> refires = new ArrayList<>(); List retryableResp = new ArrayList<>(); for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) { int index = resp.items().indexOf(bulkItemResponse); - selectingRetries(index, bulkItemResponse, sentRequests, retryableResp, - retryableReq, refires); + selectingRetries(index, bulkItemResponse, sentRequests, retryableResp, retryableReq, refires); } // Scheduling flushes for just sent out retryable requests if (!refires.isEmpty()) { scheduleRetries(refires); } // Retrieving list of remaining successful or not retryable requests - sentRequests.removeAll(retryableReq); + retryableReq.forEach(sentRequests::remove); if (!sentRequests.isEmpty()) { if (listener != null) { // Creating partial BulkRequest - BulkRequest partialRequest = newRequest() - .operations(sentRequests.stream() - .map(BulkOperationRepeatable::getOperation) - .collect(Collectors.toList())) - .build(); - // Getting contexts - List partialCtx = sentRequests.stream() - .map(BulkOperationRepeatable::getContext) - .collect(Collectors.toList()); + List partialOps = new ArrayList<>(); + List partialCtx = new ArrayList<>(); + for (RetryableBulkOperation op : sentRequests) { + partialOps.add(op.operation()); + partialCtx.add(op.context()); + } + BulkRequest partialRequest = newRequest().operations(partialOps).build(); + // Filtering response - List partialItems = new ArrayList<>(resp.items()); - partialItems.removeAll(retryableResp); + List partialItems = resp.items() + .stream() + .filter(i -> !retryableResp.contains(i)) + .collect(Collectors.toList()); BulkResponse partialResp = BulkResponse.of(br -> br .items(partialItems) - .errors(resp.errors()) // TODO sure? + .errors(resp.errors()) .took(resp.took()) .ingestTook(resp.ingestTook())); @@ -434,24 +428,23 @@ public void flush() { } private void selectingRetries(int index, BulkResponseItem bulkItemResponse, - List> sentRequests, + List> sentRequests, List retryableResp, - List> retryableReq, - List> refires) { + List> retryableReq, + List> refires) { // Getting original failed, requests and keeping successful ones to send to the listener - BulkOperationRepeatable original = sentRequests.get(index); + RetryableBulkOperation original = sentRequests.get(index); if (original.canRetry()) { retryableResp.add(bulkItemResponse); - Iterator retries = - Optional.ofNullable(original.getRetries()).orElse(backoffPolicy.iterator()); - BulkOperationRepeatable refire = new BulkOperationRepeatable<>(original.getOperation(), original.getContext(), retries); + Iterator retryTimes = Optional.ofNullable(original.retries()).orElse(backoffPolicy.iterator()); + RetryableBulkOperation refire = new RetryableBulkOperation<>(original.operation(), original.context(), retryTimes); retryableReq.add(original); refires.add(refire); addRetry(refire); - logger.warn("Added failed request back in queue, retrying in : " + refire.getCurrentRetryTimeDelay() + " ms"); + logger.warn("Added failed request back in queue, retrying in : " + refire.currentRetryTimeDelay() + " ms"); } else { - logger.warn("Retries finished for request: " + original.getOperation()._kind().toString()); + logger.warn("Retries finished for request: " + original.operation()._kind().toString()); } } @@ -485,25 +478,16 @@ private void listenerAfterBulkSuccess(BulkResponse resp, RequestExecution> retryableReq) { - List sortedDelays = retryableReq.stream() - .map(BulkOperationRepeatable::getCurrentRetryTimeDelay) - .distinct() - .sorted() - .collect(Collectors.toList()); - - // scheduling earlier delay, first in list - retryScheduler.schedule(this::flush, sortedDelays.get(0), TimeUnit.MILLISECONDS); - if (sortedDelays.size() == 2) { - // special case, scheduling both delays - retryScheduler.schedule(this::flush, sortedDelays.get(1), TimeUnit.MILLISECONDS); - } else if (sortedDelays.size() > 2) { - // general case, scheduling median and latest delays - retryScheduler.schedule(this::flush, sortedDelays.get(sortedDelays.size() / 2), - TimeUnit.MILLISECONDS); - retryScheduler.schedule(this::flush, sortedDelays.get(sortedDelays.size() - 1), - TimeUnit.MILLISECONDS); - } + private void scheduleRetries(List> retryableReq) { + LongSummaryStatistics statsDelays = retryableReq.stream() + .map(RetryableBulkOperation::currentRetryTimeDelay) + .mapToLong(Long::longValue) + .summaryStatistics(); + + // scheduling earlier and latest delay + retryScheduler.schedule(this::flush, statsDelays.getMin(), TimeUnit.MILLISECONDS); + retryScheduler.schedule(this::flush, statsDelays.getMax(), TimeUnit.MILLISECONDS); + } public void add(BulkOperation operation, Context context) { @@ -511,14 +495,14 @@ public void add(BulkOperation operation, Context context) { throw new IllegalStateException("Ingester has been closed"); } - BulkOperationRepeatable repeatableOp = new BulkOperationRepeatable<>(operation, context, + RetryableBulkOperation repeatableOp = new RetryableBulkOperation<>(operation, context, null); innerAdd(repeatableOp); } // Same as "add", but skips the closed check to allow retries to be added even after ingester closure - private void addRetry(BulkOperationRepeatable repeatableOp) { + private void addRetry(RetryableBulkOperation repeatableOp) { // Sending the operation back in the queue using the retry scheduler retriesInProgressCount.incrementAndGet(); retryScheduler.submit(() -> { @@ -532,7 +516,7 @@ private void addRetry(BulkOperationRepeatable repeatableOp) { }); } - private void innerAdd(BulkOperationRepeatable repeatableOp) { + private void innerAdd(RetryableBulkOperation repeatableOp) { IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper()); addCondition.whenReady(() -> { diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java index ac93f08a0..b8cd7ac2c 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -35,16 +35,16 @@ * A bulk operation whose size has been calculated and content turned to a binary blob (to compute its size). */ class IngesterOperation { - private final BulkOperationRepeatable repeatableOp; + private final RetryableBulkOperation repeatableOp; private final long size; - IngesterOperation(BulkOperationRepeatable repeatableOp, long size) { + IngesterOperation(RetryableBulkOperation repeatableOp, long size) { this.repeatableOp = repeatableOp; this.size = size; } - public static IngesterOperation of(BulkOperationRepeatable repeatableOp, JsonpMapper mapper) { - switch (repeatableOp.getOperation()._kind()) { + public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + switch (repeatableOp.operation()._kind()) { case Create: return createOperation(repeatableOp, mapper); case Index: @@ -54,11 +54,11 @@ public static IngesterOperation of(BulkOperationRepeatable repeatableOp, JsonpMa case Delete: return deleteOperation(repeatableOp); default: - throw new IllegalStateException("Unknown bulk operation type " + repeatableOp.getOperation()._kind()); + throw new IllegalStateException("Unknown bulk operation type " + repeatableOp.operation()._kind()); } } - public BulkOperationRepeatable repeatableOperation() { + public RetryableBulkOperation repeatableOperation() { return this.repeatableOp; } @@ -66,9 +66,9 @@ public long size() { return this.size; } - private static IngesterOperation createOperation(BulkOperationRepeatable repeatableOp, JsonpMapper mapper) { - CreateOperation create = repeatableOp.getOperation().create(); - BulkOperationRepeatable newOperation; + private static IngesterOperation createOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + CreateOperation create = repeatableOp.operation().create(); + RetryableBulkOperation newOperation; long size = basePropertiesSize(create); @@ -79,18 +79,18 @@ private static IngesterOperation createOperation(BulkOperationRepeatable repeata } else { BinaryData binaryDoc = BinaryData.of(create.document(), mapper); size += binaryDoc.size(); - newOperation = new BulkOperationRepeatable(BulkOperation.of(bo -> bo.create(idx -> { + newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.create(idx -> { copyCreateProperties(create, idx); return idx.document(binaryDoc); - })),repeatableOp.getContext(),repeatableOp.getRetries()); + })),repeatableOp.context(),repeatableOp.retries()); } return new IngesterOperation(newOperation, size); } - private static IngesterOperation indexOperation(BulkOperationRepeatable repeatableOp, JsonpMapper mapper) { - IndexOperation index = repeatableOp.getOperation().index(); - BulkOperationRepeatable newOperation; + private static IngesterOperation indexOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + IndexOperation index = repeatableOp.operation().index(); + RetryableBulkOperation newOperation; long size = basePropertiesSize(index); @@ -101,18 +101,18 @@ private static IngesterOperation indexOperation(BulkOperationRepeatable repeatab } else { BinaryData binaryDoc = BinaryData.of(index.document(), mapper); size += binaryDoc.size(); - newOperation = new BulkOperationRepeatable(BulkOperation.of(bo -> bo.index(idx -> { + newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.index(idx -> { copyIndexProperties(index, idx); return idx.document(binaryDoc); - })),repeatableOp.getContext(),repeatableOp.getRetries()); + })),repeatableOp.context(),repeatableOp.retries()); } return new IngesterOperation(newOperation, size); } - private static IngesterOperation updateOperation(BulkOperationRepeatable repeatableOp, JsonpMapper mapper) { - UpdateOperation update = repeatableOp.getOperation().update(); - BulkOperationRepeatable newOperation; + private static IngesterOperation updateOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + UpdateOperation update = repeatableOp.operation().update(); + RetryableBulkOperation newOperation; long size = basePropertiesSize(update) + size("retry_on_conflict", update.retryOnConflict()) + @@ -125,20 +125,20 @@ private static IngesterOperation updateOperation(BulkOperationRepeatable repeata } else { BinaryData action = BinaryData.of(update.action(), mapper); size += action.size(); - newOperation = new BulkOperationRepeatable(BulkOperation.of(bo -> bo.update(u -> { + newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.update(u -> { copyBaseProperties(update, u); return u .binaryAction(action) .requireAlias(update.requireAlias()) .retryOnConflict(update.retryOnConflict()); - })),repeatableOp.getContext(),repeatableOp.getRetries()); + })),repeatableOp.context(),repeatableOp.retries()); } return new IngesterOperation(newOperation, size); } - private static IngesterOperation deleteOperation(BulkOperationRepeatable repeatableOp) { - DeleteOperation delete = repeatableOp.getOperation().delete(); + private static IngesterOperation deleteOperation(RetryableBulkOperation repeatableOp) { + DeleteOperation delete = repeatableOp.operation().delete(); return new IngesterOperation(repeatableOp, basePropertiesSize(delete)); } diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/RetryableBulkOperation.java similarity index 72% rename from java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java rename to java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/RetryableBulkOperation.java index c2ae4d3ee..19f203fad 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/RetryableBulkOperation.java @@ -21,44 +21,38 @@ import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; -import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.Iterator; import java.util.Optional; -class BulkOperationRepeatable { +class RetryableBulkOperation { private final BulkOperation operation; private final Context context; private final Iterator retries; - private Long retryTime; + private final Long retryTime; - BulkOperationRepeatable(BulkOperation request, Context context, Iterator retries) { + RetryableBulkOperation(BulkOperation request, Context context, Iterator retries) { this.operation = request; this.context = context; this.retries = retries; // if the retries iterator is null it means that it's not a retry, otherwise calculating retry time - long currentMillis = getCurrentMillis(); + long currentMillis = currentMillis(); this.retryTime = Optional.ofNullable(retries).map(r -> currentMillis + r.next()).orElse(currentMillis); } - public BulkOperation getOperation() { + public BulkOperation operation() { return operation; } - public Context getContext() { + public Context context() { return context; } - public Iterator getRetries() { + public Iterator retries() { return retries; } - public Long getCurrentRetryTime() { - return this.retryTime; - } - - public long getCurrentRetryTimeDelay() { - return this.retryTime - getCurrentMillis(); + public long currentRetryTimeDelay() { + return this.retryTime - currentMillis(); } public boolean canRetry() { @@ -66,10 +60,10 @@ public boolean canRetry() { } public boolean isSendable() { - return (this.retryTime - getCurrentMillis()) <= 0; + return (this.retryTime - currentMillis()) <= 0; } - private Long getCurrentMillis(){ + private Long currentMillis(){ return System.nanoTime()/1_000_000L; } } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java index 0c14cb20c..dede24b9c 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java @@ -223,7 +223,7 @@ public void retryTestFlushAndContextExponentialBackoff() throws Exception { // One success, other will succeed after retrying, other will fail eventually { - BulkIngester ingester = newBulkIngesterWithFlushAndContext(listener); + BulkIngester ingester = newBulkIngesterWithFlushAndContextAndLongExponential(listener); ingester.add(create, 1); ingester.add(indexFail, 2); @@ -253,7 +253,41 @@ public void retryTestFlushAndContextExponentialBackoff() throws Exception { } @Test - public void multiThreadStressTest() throws InterruptedException, IOException { + public void retryTestNoFlushAndContextExponentialBackoff() throws Exception { + + // One success, other will succeed after retrying, other will fail eventually + { + BulkIngester ingester = newBulkIngesterNoFlushAndContextAndLongExponential(listener); + + ingester.add(create, 1); + ingester.add(indexFail, 2); + ingester.add(index, 3); + + ingester.close(); + + // should be 3 separate requests sent, one instant, one after a few retries, the last one error. + assertTrue(listener.requests.get() == 3); + // 2 will succeed, one will fail + assertTrue(listener.successOperations.get() == 2); + assertTrue(listener.errorOperations.get() == 1); + // between 8 and 10, depending on scheduler (first one + 2 retries for index + 8 retries for + // indexfail) + assertTrue(listener.sentRequestsTotal.get() >= 8 && listener.sentRequestsTotal.get() <= 11); + // checking order of contexts after send confirmed + Iterator iter = listener.sentContexts.iterator(); + // first one being completed is create + assertTrue(iter.next().equals(1)); + // second one is index, which will take only 2 retries + assertTrue(iter.next().equals(3)); + // last one is indexFail, taking 8 retries to fail + assertTrue(iter.next().equals(2)); + } + + transport.close(); + } + + @Test + public void retryMultiThreadStressTest() throws InterruptedException, IOException { // DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme // situation where the number of adding threads greatly exceeds the number of concurrent requests @@ -487,14 +521,24 @@ private BulkIngester newBasicBulkIngester(BulkListener listene ); } - private BulkIngester newBulkIngesterWithFlushAndContext(BulkListener listener) { + private BulkIngester newBulkIngesterWithFlushAndContextAndLongExponential(BulkListener listener) { return BulkIngester.of(b -> b .client(client) .maxOperations(10) .maxConcurrentRequests(10) .listener(listener) .flushInterval(1000, TimeUnit.MILLISECONDS) - .backoffPolicy(BackoffPolicy.constantBackoff(50L, 8)) + .backoffPolicy(BackoffPolicy.exponentialBackoff(100L, 8)) + ); + } + + private BulkIngester newBulkIngesterNoFlushAndContextAndLongExponential(BulkListener listener) { + return BulkIngester.of(b -> b + .client(client) + .maxOperations(10) + .maxConcurrentRequests(10) + .listener(listener) + .backoffPolicy(BackoffPolicy.exponentialBackoff(100L, 8)) ); } } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index b8afb6325..76a48b9fa 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -223,7 +223,7 @@ public void multiThreadStressTest() throws InterruptedException, IOException { public void sizeLimitTest() throws Exception { TestTransport transport = new TestTransport(); - long operationSize = IngesterOperation.of(new BulkOperationRepeatable<>(operation, null, null), transport.jsonpMapper()).size(); + long operationSize = IngesterOperation.of(new RetryableBulkOperation<>(operation, null, null), transport.jsonpMapper()).size(); BulkIngester ingester = BulkIngester.of(b -> b .client(new ElasticsearchAsyncClient(transport)) @@ -447,8 +447,8 @@ public void pipelineTest() { String createStr = JsonpUtils.toJsonString(create, mapper); assertEquals(json, createStr); - BulkOperation create1 = IngesterOperation.of(new BulkOperationRepeatable<>(create, null, null), mapper) - .repeatableOperation().getOperation(); + BulkOperation create1 = IngesterOperation.of(new RetryableBulkOperation<>(create, null, null), mapper) + .repeatableOperation().operation(); String create1Str = JsonpUtils.toJsonString(create1, mapper); assertEquals(json, create1Str);