From ca95dddc0e0d38ed1a1ff46225827f71ecb03cbf Mon Sep 17 00:00:00 2001 From: Sylvain Wallez Date: Wed, 4 Jan 2023 14:20:50 +0100 Subject: [PATCH] Add bulk ingester helper (#474) --- java-client/build.gradle.kts | 3 + .../_helpers/bulk/BulkIngester.java | 468 ++++++++++++++++ .../_helpers/bulk/BulkListener.java | 65 +++ .../_helpers/bulk/FnCondition.java | 137 +++++ .../_helpers/bulk/IngesterOperation.java | 214 ++++++++ .../elasticsearch/_helpers/package-info.java | 23 + .../core/bulk/UpdateOperation.java | 55 +- .../elastic/clients/transport/Endpoint.java | 12 + .../rest_client/MultiBufferEntity.java | 120 +++++ .../rest_client/RestClientTransport.java | 48 +- .../co/elastic/clients/util/BinaryData.java | 108 ++++ .../util/NoCopyByteArrayOutputStream.java | 51 ++ .../_helpers/bulk/BulkIngesterTest.java | 501 ++++++++++++++++++ .../endpoints/BinaryEndpointTest.java | 29 + .../rest_client/MultiBufferEntityTest.java | 99 ++++ 15 files changed, 1917 insertions(+), 16 deletions(-) create mode 100644 java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java create mode 100644 java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkListener.java create mode 100644 java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/FnCondition.java create mode 100644 java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java create mode 100644 java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/package-info.java create mode 100644 java-client/src/main/java/co/elastic/clients/transport/rest_client/MultiBufferEntity.java create mode 100644 java-client/src/main/java/co/elastic/clients/util/BinaryData.java create mode 100644 java-client/src/main/java/co/elastic/clients/util/NoCopyByteArrayOutputStream.java create mode 100644 java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java create mode 100644 java-client/src/test/java/co/elastic/clients/transport/rest_client/MultiBufferEntityTest.java diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index 4172adc45..8a2f9ae91 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -220,6 +220,9 @@ dependencies { exclude(group = "org.glassfish", module = "jakarta.json") } + // Apache-2.0 + testImplementation("commons-io:commons-io:2.11.0") + // EPL-2.0 // https://junit.org/junit5/ testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.2") diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java new file mode 100644 index 000000000..08790f144 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -0,0 +1,468 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.elasticsearch._helpers.bulk; + +import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.transport.TransportOptions; +import co.elastic.clients.util.ApiTypeHelper; +import co.elastic.clients.util.ObjectBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; + +public class BulkIngester implements AutoCloseable { + + private static final Log logger = LogFactory.getLog(BulkIngester.class); + + // Instance counter, to name the flush thread if we create one + private static final AtomicInteger idCounter = new AtomicInteger(); + + // Configuration + private final ElasticsearchAsyncClient client; + private final @Nullable BulkRequest globalSettings; + private final int maxRequests; + private final long maxSize; + private final int maxOperations; + private final @Nullable BulkListener listener; + private final Long flushIntervalMillis; + + private @Nullable ScheduledFuture flushTask; + private @Nullable ScheduledExecutorService scheduler; + + // Current state + private List operations = new ArrayList<>(); + private List contexts = null; // Created on demand + private long currentSize; + private int requestsInFlightCount; + private volatile boolean isClosed = false; + + // Synchronization objects + private final ReentrantLock lock = new ReentrantLock(); + private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation); + private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest); + private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed); + + private static class RequestExecution { + public final long id; + public final BulkRequest request; + public final List contexts; + public final CompletionStage futureResponse; + + RequestExecution(long id, BulkRequest request, List contexts, CompletionStage futureResponse) { + this.id = id; + this.request = request; + this.contexts = contexts; + this.futureResponse = futureResponse; + } + } + + private BulkIngester(Builder builder) { + int ingesterId = idCounter.incrementAndGet(); + this.client = ApiTypeHelper.requireNonNull(builder.client, this, "client"); + this.globalSettings = builder.globalSettings; + this.maxRequests = builder.maxConcurrentRequests; + this.maxSize = builder.bulkSize < 0 ? Long.MAX_VALUE : builder.bulkSize; + this.maxOperations = builder.bulkOperations < 0 ? Integer.MAX_VALUE : builder.bulkOperations; + this.listener = builder.listener; + this.flushIntervalMillis = builder.flushIntervalMillis; + + if (flushIntervalMillis != null) { + long flushInterval = flushIntervalMillis; + + // Create a scheduler if needed + ScheduledExecutorService scheduler; + if (builder.scheduler == null) { + scheduler = Executors.newSingleThreadScheduledExecutor((r) -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("bulk-ingester-flusher#" + ingesterId); + t.setDaemon(true); + return t; + }); + + // Keep it, we'll have to close it. + this.scheduler = scheduler; + } else { + // It's not ours, we will not close it. + scheduler = builder.scheduler; + } + + this.flushTask = scheduler.scheduleWithFixedDelay( + this::failsafeFlush, + flushInterval, flushInterval, + TimeUnit.MILLISECONDS + ); + } + } + + //----- Getters + + public int maxOperations() { + return this.maxOperations; + } + + public long maxSize() { + return this.maxSize; + } + + public int maxConcurrentRequests() { + return this.maxRequests; + } + + public Duration flushInterval() { + if (this.flushIntervalMillis != null) { + return Duration.ofMillis(flushIntervalMillis); + } else { + return null; + } + } + + //----- Statistics + + /** + * Statistics: the number of operations that were added to this ingester since it was created. + */ + public long operationsCount() { + return this.addCondition.invocations(); + } + + /** + * Statistics: the number of operations that had to wait before being added because the operation buffer + * was full and the number of http requests in flight exceeded the configured maximum number. + * + * @see Builder#maxConcurrentRequests + * @see Builder#maxOperations + * @see Builder#maxSize + */ + public long operationContentionsCount() { + return this.addCondition.contentions(); + } + + /** + * Statistics: the number of bulk requests that were produced by this ingester since it was created. + */ + public long requestCount() { + return this.sendRequestCondition.invocations(); + } + + /** + * Statistics: the number of bulk requests that could not be sent immediately because the number of + * http requests in flight exceeded the configured maximum number. + * + * @see Builder#maxConcurrentRequests + */ + public long requestContentionsCount() { + return this.sendRequestCondition.contentions(); + } + + //----- Predicates for the condition variables + + private boolean canSendRequest() { + return requestsInFlightCount < maxRequests; + } + + private boolean canAddOperation() { + return currentSize < maxSize && operations.size() < maxOperations; + } + + private boolean closedAndFlushed() { + return isClosed && operations.isEmpty() && requestsInFlightCount == 0; + } + + //----- Ingester logic + + private BulkRequest.Builder newRequest() { + BulkRequest.Builder result = new BulkRequest.Builder(); + + if (this.globalSettings != null) { + BulkRequest settings = this.globalSettings; + result + .index(settings.index()) + .pipeline(settings.pipeline()) + .refresh(settings.refresh()) + .requireAlias(settings.requireAlias()) + .routing(settings.routing()) + .sourceExcludes(settings.sourceExcludes()) + .sourceIncludes(settings.sourceIncludes()) + .source(settings.source()) + .timeout(settings.timeout()) + .waitForActiveShards(settings.waitForActiveShards()) + ; + } + + return result; + } + + private void failsafeFlush() { + try { + flush(); + } catch(Throwable thr) { + // Log the error and continue + logger.error("Error in background flush", thr); + } + } + + public void flush() { + RequestExecution exec = sendRequestCondition.whenReadyIf( + () -> { + // May happen on manual and periodic flushes + return !operations.isEmpty(); + }, + () -> { + // Build the request + BulkRequest request = newRequest().operations(operations).build(); + List requestContexts = contexts == null ? Collections.nCopies(operations.size(), null) : contexts; + + // Prepare for next round + operations = new ArrayList<>(); + contexts = null; + currentSize = 0; + addCondition.signalIfReady(); + + long id = sendRequestCondition.invocations(); + + if (listener != null) { + listener.beforeBulk(id, request, requestContexts); + } + + CompletionStage result = client.bulk(request); + requestsInFlightCount++; + + if (listener == null) { + // No need to keep the request around, it can be GC'ed + request = null; + } + + return new RequestExecution<>(id, request, requestContexts, result); + }); + + if (exec != null) { + // A request was actually sent + exec.futureResponse.handle((resp, thr) -> { + + sendRequestCondition.signalIfReadyAfter(() -> { + requestsInFlightCount--; + closeCondition.signalAllIfReady(); + }); + + if (resp != null) { + // Success + if (listener != null) { + listener.afterBulk(exec.id, exec.request, exec.contexts, resp); + } + } else { + // Failure + if (listener != null) { + listener.afterBulk(exec.id, exec.request, exec.contexts, thr); + } + } + return null; + }); + } + } + + public void add(BulkOperation operation, Context context) { + if (isClosed) { + throw new IllegalStateException("Ingester has been closed"); + } + + IngesterOperation ingestOp = IngesterOperation.of(operation, client._jsonpMapper()); + + addCondition.whenReady(() -> { + + if (context != null) { + // Lazily build the context list + if (contexts == null) { + int size = operations.size(); + if (size == 0) { + contexts = new ArrayList<>(); + } else { + contexts = new ArrayList<>(Collections.nCopies(size, null)); + } + } + contexts.add(context); + } + + operations.add(ingestOp.operation()); + currentSize += ingestOp.size(); + + if (!canAddOperation()) { + flush(); + } + }); + } + + public void add(BulkOperation operation) { + add(operation, null); + } + + public void add(Function> f) { + add(f.apply(new BulkOperation.Builder()).build(), null); + } + + public void add(Function> f, Context context) { + add(f.apply(new BulkOperation.Builder()).build(), context); + } + + @Override + public void close() { + if (isClosed) { + return; + } + + isClosed = true; + // Flush buffered operations + flush(); + // and wait for all requests to be completed + closeCondition.whenReady(() -> {}); + + if (flushTask != null) { + flushTask.cancel(false); + } + + if (scheduler != null) { + scheduler.shutdownNow(); + } + } + + //---------------------------------------------------------------------------------------------------- + + public static BulkIngester of(Function, Builder> f) { + return f.apply(new Builder<>()).build(); + } + + public static class Builder implements ObjectBuilder> { + private ElasticsearchAsyncClient client; + private BulkRequest globalSettings; + private int bulkOperations = 1000; + private long bulkSize = 5*1024*1024; + private int maxConcurrentRequests = 1; + private Long flushIntervalMillis; + private BulkListener listener; + private ScheduledExecutorService scheduler; + + public Builder client(ElasticsearchAsyncClient client) { + this.client = client; + return this; + } + + public Builder client(ElasticsearchClient client) { + TransportOptions options = client._transportOptions(); + if (options == client._transport().options()) { + options = null; + } + return client(new ElasticsearchAsyncClient(client._transport(), options)); + } + + /** + * Sets when to flush a new bulk request based on the number of operations currently added. Defaults to + * {@code 1000}. Can be set to {@code -1} to disable it. + */ + public Builder maxOperations(int count) { + this.bulkOperations = count; + return this; + } + + /** + * Sets when to flush a new bulk request based on the size in bytes of actions currently added. A request is sent + * once that size has been exceeded. Defaults to 5 megabytes. Can be set to {@code -1} to disable it. + */ + public Builder maxSize(long bytes) { + this.bulkSize = bytes; + return this; + } + + /** + * Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed + * while accumulating new bulk requests. Defaults to {@code 1}. + */ + public Builder maxConcurrentRequests(int max) { + this.maxConcurrentRequests = max; + return this; + } + + /** + * Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set. + *

+ * Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}. + */ + public Builder flushInterval(long value, TimeUnit unit) { + this.flushIntervalMillis = unit.toMillis(value); + return this; + } + + /** + * Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set. + *

+ * Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}. + */ + public Builder flushInterval(long value, TimeUnit unit, ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + return flushInterval(value, unit); + } + + public Builder listener(BulkListener listener) { + this.listener = listener; + return this; + } + + /** + * Sets global bulk request settings that will be applied to all requests sent by the ingester. + */ + public Builder globalSettings(BulkRequest.Builder settings) { + if (settings != null) { + // Set required field + this.globalSettings = settings.operations(Collections.emptyList()).build(); + } else { + this.globalSettings = null; + } + return this; + } + + /** + * Sets global bulk request settings that will be applied to all bulk requests. + */ + public Builder globalSettings(Function fn) { + return globalSettings(fn.apply(new BulkRequest.Builder())); + } + + @Override + public BulkIngester build() { + return new BulkIngester<>(this); + } + } +} + diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkListener.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkListener.java new file mode 100644 index 000000000..820105ac0 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkListener.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.elasticsearch._helpers.bulk; + +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; + +import java.util.List; + +/** + * A listener that is called by a {@link BulkIngester} to allow monitoring requests sent and their result. + * + * @param application-defined contextual data that can be associated to a bulk operation. + */ +public interface BulkListener { + + /** + * Called before a bulk request is sent. Note: documents in {@code request} operations have been + * converted to {@link co.elastic.clients.util.BinaryData}. + * + * @param executionId the id of this request, unique for the {@link BulkIngester} that created it. + * @param request the bulk request that will be sent, with documents in binary form. + * @param contexts application-defined data that was passed in {@link BulkIngester#add(BulkOperation, Object)}. + */ + void beforeBulk(long executionId, BulkRequest request, List contexts); + + /** + * Called after a bulk request has been processed. Elasticsearch accepted the request, but {@code response} the response may + * contain both successful and failure response items. + * + * @param executionId the id of this request, unique for the {@link BulkIngester} that created it. + * @param request the bulk request that will be sent, with documents in binary form. + * @param contexts application-defined data that was passed in {@link BulkIngester#add(BulkOperation, Object)}. + * @param response the response received from Elasticsearch. + */ + void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response); + + /** + * Called when a bulk request could not be sent to Elasticsearch. + * + * @param executionId the id of this request, unique for the {@link BulkIngester} that created it. + * @param request the bulk request that will be sent, with documents in binary form. + * @param contexts application-defined data that was passed in {@link BulkIngester#add(BulkOperation, Object)}. + * @param failure the failure that occurred when sending the request to Elasticsearch. + */ + void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure); +} diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/FnCondition.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/FnCondition.java new file mode 100644 index 000000000..f37052677 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/FnCondition.java @@ -0,0 +1,137 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.elasticsearch._helpers.bulk; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.function.BooleanSupplier; +import java.util.function.Supplier; + +/** + * A helper to make {@link Condition} easier and less error-prone to use. + *

+ * It takes a {@code Lock} and a readiness predicate. + */ +class FnCondition { + private final Lock lock; + public final Condition condition; + private final BooleanSupplier ready; + private long invocations; + private long contentions; + + FnCondition(Lock lock, BooleanSupplier ready) { + this.lock = lock; + this.condition = lock.newCondition(); + this.ready = ready; + } + + public void whenReady(Runnable fn) { + whenReadyIf(null, () -> { + fn.run(); + return null; + }); + } + + /** + * Runs a function when the condition variable is ready. + */ + public T whenReady(Supplier fn) { + return whenReadyIf(null, fn); + } + + /** + * Runs a function when the condition variable is ready, after verifying in that it can actually run. + *

+ * {@code canRun} and {@code fn} are executed withing the lock. + * + * @param canRun a predicate indicating if {@code fn} is ready to run. If not, returns {@code null} immediately. + * @param fn the function to run once the condition variable allows it. + * @return the result of {@code fn}. + */ + public T whenReadyIf(BooleanSupplier canRun, Supplier fn) { + lock.lock(); + try { + if (canRun != null && !canRun.getAsBoolean()) { + return null; + } + + invocations++; + boolean firstLoop = true; + while (!ready.getAsBoolean()) { + if (firstLoop) { + contentions++; + firstLoop = false; + } + condition.awaitUninterruptibly(); + } + return fn.get(); + } finally { + lock.unlock(); + } + } + + public void signalIfReady() { + lock.lock(); + try { + if (ready.getAsBoolean()) { + this.condition.signal(); + } + } finally { + lock.unlock(); + } + } + + public void signalAllIfReady() { + lock.lock(); + try { + if (ready.getAsBoolean()) { + this.condition.signalAll(); + } + } finally { + lock.unlock(); + } + } + + public void signalIfReadyAfter(Runnable r) { + lock.lock(); + try { + r.run(); + if (ready.getAsBoolean()) { + this.condition.signal(); + } + } finally { + lock.unlock(); + } + } + + /** + * Number of invocations of {@code whenReady}. + */ + public long invocations() { + return this.invocations; + } + + /** + * Number of invocations of {@code whenReady} that contended and required to wait on the condition variable. + */ + public long contentions() { + return this.contentions; + } +} diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java new file mode 100644 index 000000000..37f611774 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.elasticsearch._helpers.bulk; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationBase; +import co.elastic.clients.elasticsearch.core.bulk.CreateOperation; +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import co.elastic.clients.elasticsearch.core.bulk.UpdateOperation; +import co.elastic.clients.json.JsonEnum; +import co.elastic.clients.json.JsonpMapper; +import co.elastic.clients.util.BinaryData; + +import javax.annotation.Nullable; + +/** + * A bulk operation whose size has been calculated and content turned to a binary blob (to compute its size). + */ +public class IngesterOperation { + private final BulkOperation operation; + private final long size; + + public IngesterOperation(BulkOperation operation, long size) { + this.operation = operation; + this.size = size; + } + + public static IngesterOperation of(BulkOperation operation, JsonpMapper mapper) { + switch (operation._kind()) { + case Create: + return createOperation(operation, mapper); + case Index: + return indexOperation(operation, mapper); + case Update: + return updateOperation(operation, mapper); + case Delete: + return deleteOperation(operation); + default: + throw new IllegalStateException("Unknown bulk operation type " + operation._kind()); + } + } + + public BulkOperation operation() { + return this.operation; + } + + public long size() { + return this.size; + } + + private static IngesterOperation createOperation(BulkOperation operation, JsonpMapper mapper) { + CreateOperation create = operation.create(); + BulkOperation newOperation; + + long size = basePropertiesSize(create); + + if (create.document() instanceof BinaryData) { + newOperation = operation; + size += ((BinaryData) create.document()).size(); + + } else { + BinaryData binaryDoc = BinaryData.of(create.document(), mapper); + size += binaryDoc.size(); + newOperation = BulkOperation.of(bo -> bo.create(idx -> { + copyBaseProperties(create, idx); + return idx.document(binaryDoc); + })); + } + + return new IngesterOperation(newOperation, size); + } + + private static IngesterOperation indexOperation(BulkOperation operation, JsonpMapper mapper) { + IndexOperation index = operation.index(); + BulkOperation newOperation; + + long size = basePropertiesSize(index); + + if (index.document() instanceof BinaryData) { + newOperation = operation; + size += ((BinaryData) index.document()).size(); + + } else { + BinaryData binaryDoc = BinaryData.of(index.document(), mapper); + size += binaryDoc.size(); + newOperation = BulkOperation.of(bo -> bo.index(idx -> { + copyBaseProperties(index, idx); + return idx.document(binaryDoc); + })); + } + + return new IngesterOperation(newOperation, size); + } + + private static IngesterOperation updateOperation(BulkOperation operation, JsonpMapper mapper) { + UpdateOperation update = operation.update(); + BulkOperation newOperation; + + long size = basePropertiesSize(update) + + size("retry_on_conflict", update.retryOnConflict()) + + size("require_alias", update.requireAlias()); + + if (update.binaryAction() != null) { + newOperation = operation; + size += update.binaryAction().size(); + + } else { + BinaryData action = BinaryData.of(update.action(), mapper); + size += action.size(); + newOperation = BulkOperation.of(bo -> bo.update(u -> { + copyBaseProperties(update, u); + return u + .binaryAction(action) + .requireAlias(update.requireAlias()) + .retryOnConflict(update.retryOnConflict()); + })); + } + + return new IngesterOperation(newOperation, size); + } + + private static IngesterOperation deleteOperation(BulkOperation operation) { + DeleteOperation delete = operation.delete(); + return new IngesterOperation(operation, basePropertiesSize(delete)); + } + + + private static void copyBaseProperties(BulkOperationBase op, BulkOperationBase.AbstractBuilder builder) { + builder + .id(op.id()) + .index(op.index()) + .ifPrimaryTerm(op.ifPrimaryTerm()) + .ifSeqNo(op.ifSeqNo()) + .routing(op.routing()) + .version(op.version()) + .versionType(op.versionType()); + } + + private static int size(String name, @Nullable Boolean value) { + if (value != null) { + return name.length() + 12; // 12 added chars for "name":"false", + } else { + return 0; + } + } + + private static int size(String name, @Nullable String value) { + if (value != null) { + return name.length() + value.length() + 6; // 6 added chars for "name":"value", + } else { + return 0; + } + } + + private static int size(String name, @Nullable Long value) { + if (value != null) { + // Borrowed from Long.toUnsignedString0, shift = 3 (base 10 is closer to 3 than 4) + int mag = Integer.SIZE - Long.numberOfLeadingZeros(value); + int chars = Math.max(((mag + (3 - 1)) / 3), 1); + return name.length() + chars + 4; // 4 added chars for "name":, + } else { + return 0; + } + } + + private static int size(String name, @Nullable Integer value) { + if (value != null) { + // Borrowed from Integer.toUnsignedString0, shift = 3 (base 10 is closer to 3 than 4) + int mag = Integer.SIZE - Integer.numberOfLeadingZeros(value); + int chars = Math.max(((mag + (3 - 1)) / 3), 1); + return name.length() + chars + 4; + } else { + return 0; + } + } + + private static int size(String name, @Nullable JsonEnum value) { + if (value != null) { + return name.length() + value.jsonValue().length() + 6; + } else { + return 0; + } + } + + private static int basePropertiesSize(BulkOperationBase op) { + return + size("id", op.id()) + + size("index", op.index()) + + size("if_primary_term", op.ifPrimaryTerm()) + + size("if_seq_no", op.ifSeqNo()) + + size("routing", op.routing()) + + size("version", op.version()) + + size("version_type", op.versionType()) + + 4; // Open/closing brace, 2 newlines + } +} diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/package-info.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/package-info.java new file mode 100644 index 000000000..c394d972e --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Base package for helpers providing additional helper/utilities on top of the Java API client. + */ +package co.elastic.clients.elasticsearch._helpers; diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/core/bulk/UpdateOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/core/bulk/UpdateOperation.java index 22c8ffa83..425f9757d 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/core/bulk/UpdateOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/core/bulk/UpdateOperation.java @@ -30,7 +30,7 @@ import co.elastic.clients.json.NdJsonpSerializable; import co.elastic.clients.json.ObjectBuilderDeserializer; import co.elastic.clients.json.ObjectDeserializer; -import co.elastic.clients.util.ApiTypeHelper; +import co.elastic.clients.util.BinaryData; import co.elastic.clients.util.ObjectBuilder; import jakarta.json.stream.JsonGenerator; import java.lang.Boolean; @@ -53,8 +53,12 @@ public class UpdateOperation extends BulkOperationB implements NdJsonpSerializable, BulkOperationVariant { + @Nullable private final UpdateAction action; + @Nullable + private final BinaryData binaryAction; + @Nullable private final Boolean requireAlias; @@ -71,7 +75,8 @@ public class UpdateOperation extends BulkOperationB private UpdateOperation(Builder builder) { super(builder); - this.action = ApiTypeHelper.requireNonNull(builder.action, this, "action"); + this.action = builder.action; + this.binaryAction = builder.binaryAction; this.requireAlias = builder.requireAlias; this.retryOnConflict = builder.retryOnConflict; @@ -94,15 +99,30 @@ public BulkOperation.Kind _bulkOperationKind() { } /** - * Required - API name: {@code action} + * Update action + *

+ * API name: {@code action} */ + @Nullable public final UpdateAction action() { return this.action; } + /** + * Serialized representation of the update action. You should use + * action instead. This binary representation is used by the + * BulkIngester helper to compute the binary size of bulk requests. + *

+ * API name: {@code binary_action} + */ + @Nullable + public final BinaryData binaryAction() { + return this.binaryAction; + } + @Override public Iterator _serializables() { - return Arrays.asList(this, this.action).iterator(); + return Arrays.asList(this, this.action, this.binaryAction).iterator(); } /** @@ -148,24 +168,45 @@ public static class Builder BulkOperationBase.AbstractBuilder> implements ObjectBuilder> { + @Nullable private UpdateAction action; + @Nullable + private BinaryData binaryAction; + /** - * Required - API name: {@code action} + * Update action + *

+ * API name: {@code action} */ - public final Builder action(UpdateAction value) { + public final Builder action( + @Nullable UpdateAction value) { this.action = value; return this; } /** - * Required - API name: {@code action} + * Update action + *

+ * API name: {@code action} */ public final Builder action( Function, ObjectBuilder>> fn) { return this.action(fn.apply(new UpdateAction.Builder()).build()); } + /** + * Serialized representation of the update action. You should use + * action instead. This binary representation is used by the + * BulkIngester helper to compute the binary size of bulk requests. + *

+ * API name: {@code binary_action} + */ + public final Builder binaryAction(@Nullable BinaryData value) { + this.binaryAction = value; + return this; + } + @Nullable private Boolean requireAlias; diff --git a/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java b/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java index 3446f6a55..275695d67 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java +++ b/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java @@ -20,6 +20,7 @@ package co.elastic.clients.transport; import co.elastic.clients.json.JsonpDeserializer; +import co.elastic.clients.transport.endpoints.BinaryEndpoint; import javax.annotation.Nullable; import java.util.Collections; @@ -82,4 +83,15 @@ default Map headers(RequestT request) { @Nullable JsonpDeserializer errorDeserializer(int statusCode); + default BinaryEndpoint withBinaryResponse() { + return new BinaryEndpoint<>( + this.id(), + this::method, + this::requestUrl, + this::queryParameters, + this::headers, + this.hasRequestBody(), + null + ); + } } diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/MultiBufferEntity.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/MultiBufferEntity.java new file mode 100644 index 000000000..f1a8e81db --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/MultiBufferEntity.java @@ -0,0 +1,120 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest_client; + +import co.elastic.clients.util.NoCopyByteArrayOutputStream; +import org.apache.http.entity.AbstractHttpEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.entity.HttpAsyncContentProducer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.Iterator; + +/** + * An HTTP entity based on a sequence of byte buffers. + */ +class MultiBufferEntity extends AbstractHttpEntity implements HttpAsyncContentProducer { + + private final Iterable buffers; + + private Iterator iterator; + private volatile ByteBuffer currentBuffer; + + MultiBufferEntity(Iterable buffers, ContentType contentType) { + this.buffers = buffers; + setChunked(true); + if (contentType != null) { + setContentType(contentType.toString()); + } + init(); + } + + @Override + public void close() throws IOException { + // Reset state, the request may be retried + init(); + } + + private void init() { + this.iterator = buffers.iterator(); + if (this.iterator.hasNext()) { + this.currentBuffer = this.iterator.next().duplicate(); + } else { + this.currentBuffer = null; + } + } + + @Override + public boolean isRepeatable() { + return true; + } + + @Override + public long getContentLength() { + // Use chunked encoding + return -1; + } + + @Override + public boolean isStreaming() { + return false; + } + + @Override + public InputStream getContent() throws IOException, UnsupportedOperationException { + NoCopyByteArrayOutputStream baos = new NoCopyByteArrayOutputStream(); + writeTo(baos); + return baos.asInputStream(); + } + + @Override + public void writeTo(OutputStream out) throws IOException { + WritableByteChannel channel = Channels.newChannel(out); + for (ByteBuffer buffer: buffers) { + channel.write(buffer.duplicate()); + } + } + + @Override + public void produceContent(ContentEncoder encoder, IOControl ioControl) throws IOException { + if (currentBuffer == null) { + encoder.complete(); + return; + } + + encoder.write(currentBuffer); + + if (!currentBuffer.hasRemaining()) { + if (iterator.hasNext()) { + currentBuffer = iterator.next().duplicate(); + } else { + currentBuffer = null; + encoder.complete(); + } + } + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java index 7b198d854..d884d190f 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java @@ -34,6 +34,7 @@ import co.elastic.clients.transport.Endpoint; import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.util.ApiTypeHelper; +import co.elastic.clients.util.BinaryData; import co.elastic.clients.util.MissingRequiredPropertyException; import jakarta.json.stream.JsonGenerator; import jakarta.json.stream.JsonParser; @@ -54,9 +55,13 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -153,9 +158,15 @@ public CompletableFuture performRequest Endpoint endpoint, @Nullable TransportOptions options ) { - org.elasticsearch.client.Request clientReq = prepareLowLevelRequest(request, endpoint, options); - RequestFuture future = new RequestFuture<>(); + org.elasticsearch.client.Request clientReq; + try { + clientReq = prepareLowLevelRequest(request, endpoint, options); + } catch (Exception e) { + // Terminate early + future.completeExceptionally(e); + return future; + } // Propagate required property checks to the thread that will decode the response boolean disableRequiredChecks = ApiTypeHelper.requiredPropertiesCheckDisabled(); @@ -187,7 +198,7 @@ private org.elasticsearch.client.Request prepareLowLevelRequest( RequestT request, Endpoint endpoint, @Nullable TransportOptions options - ) { + ) throws IOException { String method = endpoint.method(request); String path = endpoint.requestUrl(request); Map params = endpoint.queryParameters(request); @@ -206,28 +217,48 @@ private org.elasticsearch.client.Request prepareLowLevelRequest( if (endpoint.hasRequestBody()) { // Request has a body and must implement JsonpSerializable or NdJsonpSerializable - ByteArrayOutputStream baos = new ByteArrayOutputStream(); if (request instanceof NdJsonpSerializable) { - writeNdJson((NdJsonpSerializable) request, baos); + List lines = new ArrayList<>(); + collectNdJsonLines(lines, (NdJsonpSerializable)request); + clientReq.setEntity(new MultiBufferEntity(lines, JsonContentType)); } else { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); mapper.serialize(request, generator); generator.close(); + clientReq.setEntity(new ByteArrayEntity(baos.toByteArray(), JsonContentType)); } - - clientReq.setEntity(new ByteArrayEntity(baos.toByteArray(), JsonContentType)); } // Request parameter intercepted by LLRC clientReq.addParameter("ignore", "400,401,403,404,405"); return clientReq; } + private static final ByteBuffer NdJsonSeparator = ByteBuffer.wrap("\n".getBytes(StandardCharsets.UTF_8)); + + private void collectNdJsonLines(List lines, NdJsonpSerializable value) { + Iterator values = value._serializables(); + while(values.hasNext()) { + Object item = values.next(); + if (item == null) { + // Skip + } else if (item instanceof NdJsonpSerializable && item != value) { // do not recurse on the item itself + collectNdJsonLines(lines, (NdJsonpSerializable)item); + } else { + // TODO: items that aren't already BinaryData could be serialized to ByteBuffers lazily + // to reduce the number of buffers to keep in memory + lines.add(BinaryData.of(item, this.mapper).asByteBuffer()); + lines.add(NdJsonSeparator); + } + } + } + /** * Write an nd-json value by serializing each of its items on a separate line, recursing if its items themselves implement * {@link NdJsonpSerializable} to flattening nested structures. */ - private void writeNdJson(NdJsonpSerializable value, ByteArrayOutputStream baos) { + private void writeNdJson(NdJsonpSerializable value, ByteArrayOutputStream baos) throws IOException { Iterator values = value._serializables(); while(values.hasNext()) { Object item = values.next(); @@ -324,7 +355,6 @@ private ResponseT decodeResponse( try (JsonParser parser = mapper.jsonProvider().createParser(content)) { response = responseParser.deserialize(parser, mapper); } - ; } return response; diff --git a/java-client/src/main/java/co/elastic/clients/util/BinaryData.java b/java-client/src/main/java/co/elastic/clients/util/BinaryData.java new file mode 100644 index 000000000..500f16284 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/util/BinaryData.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.util; + +import co.elastic.clients.json.JsonpMapper; +import jakarta.json.stream.JsonGenerator; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * Binary data representing a serialized value. + */ +public interface BinaryData { + + /** + * Write this data to an output stream. + */ + void writeTo(OutputStream out) throws IOException; + + /** + * Return this data as a {@code ByteBuffer} + */ + ByteBuffer asByteBuffer(); + + /** + * Get the estimated size in bytes of the data. + * + * @return the estimated size, or -1 if the value cannot be estimated or if the data has already been + * consumed. + */ + long size(); + + /** + * Create a {@code BinaryData} from a value and a JSON mapper. The binary content is the result of serializing + * {@code value} with {@code mapper}. Returns {@code null} if {@code value} is null. + */ + static BinaryData of(Object value, JsonpMapper mapper) { + if (value == null) { + return null; + } + + if (value instanceof BinaryData) { + return (BinaryData)value; + } + + NoCopyByteArrayOutputStream out = new NoCopyByteArrayOutputStream(); + JsonGenerator generator = mapper.jsonProvider().createGenerator(out); + mapper.serialize(value, generator); + generator.close(); + + return new ByteArrayBinaryData(out.array(), 0, out.size()); + } + + static BinaryData of(byte[] bytes) { + return new ByteArrayBinaryData(bytes, 0, bytes.length); + } + + static BinaryData of(byte[] value, int offset, int length) { + return new ByteArrayBinaryData(value, offset, length); + } + + class ByteArrayBinaryData implements BinaryData { + + private final byte[] bytes; + private final int offset; + private final int length; + + ByteArrayBinaryData(byte[] bytes, int offset, int length) { + this.bytes = bytes; + this.offset = offset; + this.length = length; + } + + @Override + public void writeTo(OutputStream out) throws IOException { + out.write(bytes, offset, length); + } + + @Override + public long size() { + return length; + } + + @Override + public ByteBuffer asByteBuffer() { + return ByteBuffer.wrap(bytes, offset, length); + } + } +} diff --git a/java-client/src/main/java/co/elastic/clients/util/NoCopyByteArrayOutputStream.java b/java-client/src/main/java/co/elastic/clients/util/NoCopyByteArrayOutputStream.java new file mode 100644 index 000000000..5d91204a6 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/util/NoCopyByteArrayOutputStream.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +/** + * A {@code ByteArrayOutputStream} that reduces copy operations of its underlying buffer. + */ +public class NoCopyByteArrayOutputStream extends ByteArrayOutputStream { + + public NoCopyByteArrayOutputStream() { + } + + public NoCopyByteArrayOutputStream(int size) { + super(size); + } + + /** + * Get the underlying buffer. Data was added to this buffer up to {@code size()}. Note that calling this method + * again may return a different result if additional data was inserted and the buffer had to grow. + */ + public byte[] array() { + return this.buf; + } + + /** + * Get an {@code InputStream} view on this object, based on the current buffer and size. + */ + public ByteArrayInputStream asInputStream() { + return new ByteArrayInputStream(this.buf, 0, this.count); + } +} diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java new file mode 100644 index 000000000..80d9daec2 --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -0,0 +1,501 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.elasticsearch._helpers.bulk; + +import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.ElasticsearchTestServer; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import co.elastic.clients.elasticsearch.core.bulk.OperationType; +import co.elastic.clients.elasticsearch.end_to_end.RequestTest; +import co.elastic.clients.json.JsonpMapper; +import co.elastic.clients.json.SimpleJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.Endpoint; +import co.elastic.clients.transport.TransportOptions; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +class BulkIngesterTest extends Assertions { + + private static final BulkResponseItem successItem = BulkResponseItem.of(i -> i + .index("foo") + .status(200) + .operationType(OperationType.Delete) + ); + + private static final BulkOperation operation = BulkOperation.of(op -> op + .delete(d -> d.index("foo").id("bar")) + ); + + private void printStats(BulkIngester ingester) { + System.out.printf("Ingester - operations: %d (%d), requests: %d (%d)%n", + ingester.operationsCount(), ingester.operationContentionsCount(), + ingester.requestCount(), ingester.requestContentionsCount() + ); + } + + private void printStats(CountingListener listener) { + System.out.printf("Listener - operations: %d, requests: %d%n", + listener.operations.get(), + listener.requests.get() + ); + } + + private void printStats(TestTransport transport) { + System.out.printf("Transport - operations: %d, requests: %d (%d completed)%n", + transport.operations.get(), + transport.requestsStarted.get(), + transport.requestsCompleted.get() + ); + } + + @Test + public void basicTestFlush() throws Exception { + // Prime numbers, so that we have leftovers to flush before shutting down + multiThreadTest(7, 3, 5, 101); + } + + @Test + public void basicTestNoFlush() throws Exception { + // Will have nothing to flush on close. + multiThreadTest(10, 3, 5, 100); + } + + private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations) throws Exception { + + CountingListener listener = new CountingListener(); + TestTransport transport = new TestTransport(); + ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport); + + BulkIngester ingester = BulkIngester.of(b -> b + .client(client) + .maxOperations(maxOperations) + .maxConcurrentRequests(maxRequests) + .listener(listener) + ); + + CountDownLatch latch = new CountDownLatch(numThreads); + for (int i = 0; i < numThreads; i++) { + new Thread(() -> { + try { + Thread.sleep((long)(Math.random() * 100)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + for (int j = 0; j < numOperations; j++) { + ingester.add(operation); + } + + latch.countDown(); + }).start(); + } + + latch.await(); + + ingester.close(); + transport.close(); + + printStats(ingester); + printStats(listener); + printStats(transport); + + int expectedOperations = numThreads * numOperations; + assertEquals(expectedOperations, ingester.operationsCount()); + assertEquals(expectedOperations, listener.operations.get()); + assertEquals(expectedOperations, transport.operations.get()); + + int expectedRequests = expectedOperations / maxOperations + ((expectedOperations % maxOperations == 0) ? 0 : 1) ; + + assertEquals(expectedRequests, ingester.requestCount()); + assertEquals(expectedRequests, listener.requests.get()); + assertEquals(expectedRequests, transport.requestsStarted.get()); + } + + @Test + public void sizeLimitTest() throws Exception { + TestTransport transport = new TestTransport(); + + long operationSize = IngesterOperation.of(operation, transport.jsonpMapper()).size(); + + BulkIngester ingester = BulkIngester.of(b -> b + .client(new ElasticsearchAsyncClient(transport)) + // Set size limit just above operation's size, leading to 2 operations per request + .maxSize(operationSize + 1) + ); + + for (int i = 0; i < 10; i++) { + ingester.add(operation); + } + + ingester.close(); + transport.close(); + + assertEquals(10, ingester.operationsCount()); + assertEquals(5, ingester.requestCount()); + } + + @Test + public void periodicFlushTest() throws Exception { + TestTransport transport = new TestTransport(); + + BulkIngester ingester = BulkIngester.of(b -> b + .client(new ElasticsearchAsyncClient(transport)) + // Flush every 50 ms + .flushInterval(50, TimeUnit.MILLISECONDS) + // Disable other flushing limits + .maxSize(-1) + .maxOperations(-1) + .maxConcurrentRequests(Integer.MAX_VALUE) + ); + + // Add an operation every 100 ms to give time + // to the flushing timer to kick in. + for (int i = 0; i < 10; i++) { + ingester.add(operation); + Thread.sleep(100); + } + + ingester.close(); + transport.close(); + + // We should have one operation per request + assertEquals(10, ingester.operationsCount()); + assertEquals(10, ingester.requestCount()); + } + + @Test + public void failingListener() throws Exception { + TestTransport transport = new TestTransport(); + AtomicInteger failureCount = new AtomicInteger(); + AtomicReference> lastContexts = new AtomicReference<>(); + AtomicReference lastRequest = new AtomicReference<>(); + + BulkListener listener = new BulkListener() { + @Override + public void beforeBulk(long executionId, BulkRequest request, List contexts) { + // So that we can test that it's non-empty + lastContexts.set(contexts); + lastRequest.set(request); + + if (executionId == 1) { + // Fail before the request is sent + failureCount.incrementAndGet(); + throw new RuntimeException("Before bulk failure"); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { + if (executionId == 2) { + // Fail after the request is sent + failureCount.incrementAndGet(); + throw new RuntimeException("After bulk failure"); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { + + } + }; + + BulkIngester ingester = BulkIngester.of(b -> b + .client(new ElasticsearchAsyncClient(transport)) + // Flush every 50 ms + .flushInterval(50, TimeUnit.MILLISECONDS) + // Disable other flushing limits + .maxSize(-1) + .maxOperations(-1) + .maxConcurrentRequests(Integer.MAX_VALUE) + .listener(listener) + ); + + // Add an operation every 100 ms to give time + // to the flushing timer to kick in. + for (int i = 0; i < 10; i++) { + ingester.add(operation); + Thread.sleep(100); + } + + ingester.close(); + transport.close(); + + // We should have one operation per request + assertEquals(10, ingester.operationsCount()); + assertEquals(10, ingester.requestCount()); + // Transport hasn't seen the request where beforeBulk failed + assertEquals(9, transport.requestsStarted.get()); + + assertEquals(2, failureCount.get()); + + // Also test context list when no values were provided + assertTrue(lastRequest.get().operations().size() > 0); + assertEquals(lastRequest.get().operations().size(), lastContexts.get().size()); + } + + @Test + public void withContextValues() throws Exception { + TestTransport transport = new TestTransport(); + List allRequests = Collections.synchronizedList(new ArrayList<>()); + List> allContexts = Collections.synchronizedList(new ArrayList<>()); + + BulkListener listener = new BulkListener() { + @Override + public void beforeBulk(long executionId, BulkRequest request, List contexts) { + allRequests.add(request); + allContexts.add(contexts); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { + } + }; + + BulkIngester ingester = BulkIngester.of(b -> b + .client(new ElasticsearchAsyncClient(transport)) + // Split every 10 operations + .maxOperations(10) + .listener(listener) + ); + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + // Set a context only after 5, so that we test filling with nulls. + Integer context = j < 5 ? null : i*10 + j; + ingester.add(operation, context); + } + } + + ingester.close(); + transport.close(); + + // We should have 10 operations per request + assertEquals(100, ingester.operationsCount()); + assertEquals(10, ingester.requestCount()); + + for (int i = 0; i < 10; i++) { + List contexts = allContexts.get(i); + for (int j = 0; j < 10; j++) { + if (j < 5) { + assertNull(contexts.get(j)); + } else { + assertEquals(contexts.get(j), i*10 + j); + } + } + } + } + + @Test + public void testGlobalSettings() throws Exception { + AtomicReference storedRequest = new AtomicReference<>(); + + TestTransport transport = new TestTransport(); + CountingListener listener = new CountingListener() { + @Override + public void beforeBulk(long executionId, BulkRequest request, List contexts) { + super.beforeBulk(executionId, request, contexts); + storedRequest.set(request); + } + }; + + BulkIngester ingester = BulkIngester.of(b -> b + .client(new ElasticsearchAsyncClient(transport)) + .listener(listener) + .globalSettings(s -> s + .index("foo") + .routing("bar") + ) + ); + + ingester.add(operation); + + ingester.close(); + transport.close(); + + assertEquals(1, ingester.operationsCount()); + assertEquals(1, ingester.requestCount()); + + assertEquals("foo", storedRequest.get().index()); + assertEquals("bar", storedRequest.get().routing()); + } + + @Test + public void endToEndTest() throws Exception { + String index = "bulk-ingester-test"; + ElasticsearchClient client = ElasticsearchTestServer.global().client(); + + BulkIngester ingester = BulkIngester.of(b -> b + .client(client) + .globalSettings(s -> s.index(index)) + ); + + RequestTest.AppData appData = new RequestTest.AppData(); + appData.setIntValue(42); + appData.setMsg("Some message"); + + ingester.add(_1 -> _1 + .create(_2 -> _2 + .id("abc") + .document(appData) + )); + + ingester.add(_1 -> _1 + .create(_2 -> _2 + .id("def") + .document(appData) + )); + + ingester.add(_1 -> _1 + .update(_2 -> _2 + .id("gh") + .action(_3 -> _3 + .docAsUpsert(true) + .doc(appData)) + )); + + // Closing waits until all pending requests are completed + ingester.close(); + + for (String id : Arrays.asList("abc", "def", "gh")) { + assertEquals( + 42, + client.get(b -> b + .index(index) + .id(id), + RequestTest.AppData.class + ).source().getIntValue() + ); + } + + client.indices().delete(d -> d.index(index)); + + } + + //----------------------------------------------------------------------------------------------------------------- + + private static class CountingListener implements BulkListener { + public final AtomicInteger operations = new AtomicInteger(); + public final AtomicInteger requests = new AtomicInteger(); + @Override + public void beforeBulk(long executionId, BulkRequest request, List contexts) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { + operations.addAndGet(request.operations().size()); + requests.incrementAndGet(); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { + failure.printStackTrace(); + operations.addAndGet(request.operations().size()); + requests.incrementAndGet(); + } + } + + private static class TestTransport implements ElasticsearchTransport { + public final AtomicInteger requestsStarted = new AtomicInteger(); + public final AtomicInteger requestsCompleted = new AtomicInteger(); + public final AtomicInteger operations = new AtomicInteger(); + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + @Override + public ResponseT performRequest( + RequestT request, + Endpoint endpoint, + @Nullable TransportOptions options + ) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture performRequestAsync(RequestT request, Endpoint endpoint, @Nullable TransportOptions options) { + + BulkRequest bulk = (BulkRequest) request; + requestsStarted.incrementAndGet(); + operations.addAndGet(bulk.operations().size()); + + if (bulk.operations().size() == 0) { + System.out.println("No operations!"); + } + + List items = new ArrayList<>(); + for (int i = 0; i < bulk.operations().size(); i++) { + items.add(successItem); + } + + CompletableFuture response = new CompletableFuture<>(); + executor.submit(() -> { + requestsCompleted.incrementAndGet(); + response.complete(BulkResponse.of(r -> r.errors(false).items(items).took(3))); + }); + + @SuppressWarnings("unchecked") + CompletableFuture result = (CompletableFuture)response; + return result; + } + + @Override + public JsonpMapper jsonpMapper() { + return SimpleJsonpMapper.INSTANCE; + } + + @Override + public TransportOptions options() { + return null; + } + + @Override + public void close() throws IOException { + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/java-client/src/test/java/co/elastic/clients/transport/endpoints/BinaryEndpointTest.java b/java-client/src/test/java/co/elastic/clients/transport/endpoints/BinaryEndpointTest.java index ab98de3de..de4175b3c 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/endpoints/BinaryEndpointTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/endpoints/BinaryEndpointTest.java @@ -20,9 +20,13 @@ package co.elastic.clients.transport.endpoints; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.ElasticsearchTestServer; +import co.elastic.clients.elasticsearch.core.SearchRequest; import co.elastic.clients.json.SimpleJsonpMapper; +import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.transport.rest_client.RestClientTransport; import com.sun.net.httpserver.HttpServer; +import org.apache.commons.io.IOUtils; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.junit.jupiter.api.AfterAll; @@ -96,4 +100,29 @@ public void testMvtSearch() throws IOException { assertEquals("application/vnd.hello-world", resp.contentType()); assertEquals("Hello world", baos.toString(StandardCharsets.UTF_8.name())); } + + @Test + public void convertJsonToBinaryEndpoint() throws IOException { + + ElasticsearchClient esClient = ElasticsearchTestServer.global().client(); + + // Create the search request + SearchRequest request = SearchRequest.of(b -> b); + + // Create a binary endpoint from the regular search endpoint. It will not deserialize + // the response and instead will just return the raw response input stream. + BinaryEndpoint binarySearchEndpoint = SearchRequest._ENDPOINT.withBinaryResponse(); + + // Force typed_keys to false, so that aggregations names do not hold type information + TransportOptions options = esClient._transportOptions().toBuilder() + .setParameter("typed_keys", "false") + .build(); + + // Call Elasticsearch by providing the transport the request and endpoint + BinaryResponse binaryResponse = esClient._transport().performRequest(request, binarySearchEndpoint, options); + + // Do something with the response + String response = IOUtils.toString(binaryResponse.content(), StandardCharsets.UTF_8); + assertTrue(response.matches("\\{\"took\":\\d+,\"timed_out\":false.*")); + } } diff --git a/java-client/src/test/java/co/elastic/clients/transport/rest_client/MultiBufferEntityTest.java b/java-client/src/test/java/co/elastic/clients/transport/rest_client/MultiBufferEntityTest.java new file mode 100644 index 000000000..b241c1905 --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/transport/rest_client/MultiBufferEntityTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest_client; + +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.endpoints.BinaryResponse; +import com.sun.net.httpserver.HttpServer; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +public class MultiBufferEntityTest { + + @Test + public void testBulkRequest() throws IOException { + + HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + + httpServer.createContext("/_bulk", exchange -> { + exchange.getResponseHeaders().set("X-Elastic-Product", "Elasticsearch"); + byte[] bytes = IOUtils.toByteArray(exchange.getRequestBody()); + exchange.sendResponseHeaders(200, 0); + exchange.getResponseBody().write(bytes); + exchange.close(); + }); + + httpServer.start(); + + RestClient restClient = + RestClient.builder(new HttpHost(httpServer.getAddress().getAddress(), httpServer.getAddress().getPort())).build(); + + BulkRequest req = BulkRequest.of(_0 -> _0 + .operations(_1 -> _1 + .create(_2 -> _2 + .index("foo") + .id("abc") + .document("abc-doc") + )) + .operations(_1 -> _1 + .create(_2 -> _2 + .index("foo") + .id("def") + .document("def-doc") + )) + .operations(_1 -> _1 + .update(_2 -> _2 + .index("foo") + .id("gh") + .action(_3 -> _3 + .docAsUpsert(true) + .doc("gh-doc")) + ) + ) + ); + RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); + + BinaryResponse binaryResponse = transport.performRequest(req, BulkRequest._ENDPOINT.withBinaryResponse(), null); + + String str = IOUtils.toString(binaryResponse.content(), StandardCharsets.UTF_8); + + httpServer.stop(0); + transport.close(); + + Assertions.assertEquals( + "{\"create\":{\"_id\":\"abc\",\"_index\":\"foo\"}}\n" + + "\"abc-doc\"\n" + + "{\"create\":{\"_id\":\"def\",\"_index\":\"foo\"}}\n" + + "\"def-doc\"\n" + + "{\"update\":{\"_id\":\"gh\",\"_index\":\"foo\"}}\n" + + "{\"doc\":\"gh-doc\",\"doc_as_upsert\":true}\n", + str + ); + } +}