diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index e1e131a97..f8e9468d5 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -24,6 +24,8 @@ import io.netty.handler.timeout.WriteTimeoutHandler; import org.elasticsearch.client.indices.GetFieldMappingsRequest; import org.elasticsearch.client.indices.GetFieldMappingsResponse; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @@ -521,6 +523,13 @@ public Mono deleteBy(HttpHeaders headers, DeleteByQueryReq .next(); } + @Override + public Mono updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) { + return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers) // + .next() // + .map(UpdateByQueryResponse::of); + } + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 85054a49e..a87d23f33 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -17,6 +17,8 @@ import org.elasticsearch.client.indices.GetFieldMappingsRequest; import org.elasticsearch.client.indices.GetFieldMappingsResponse; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -596,6 +598,44 @@ default Mono deleteBy(DeleteByQueryRequest deleteRequest) */ Mono deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest); + /** + * Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API. + * + * @param consumer never {@literal null}. + * @see Update By + * * Query API on elastic.co + * @return a {@link Mono} emitting operation response. + */ + default Mono updateBy(Consumer consumer){ + + final UpdateByQueryRequest request = new UpdateByQueryRequest(); + consumer.accept(request); + return updateBy(request); + } + + /** + * Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API. + * + * @param updateRequest must not be {@literal null}. + * @see Update By + * * Query API on elastic.co + * @return a {@link Mono} emitting operation response. + */ + default Mono updateBy(UpdateByQueryRequest updateRequest){ + return updateBy(HttpHeaders.EMPTY, updateRequest); + } + + /** + * Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param updateRequest must not be {@literal null}. + * @see Update By + * * Query API on elastic.co + * @return a {@link Mono} emitting operation response. + */ + Mono updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest); + /** * Execute a {@link BulkRequest} against the {@literal bulk} API. * diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java index 9f3f79397..dc774bb39 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java @@ -33,6 +33,7 @@ import org.elasticsearch.client.indices.IndexTemplatesExistRequest; import org.elasticsearch.client.indices.PutIndexTemplateRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; import org.springframework.data.elasticsearch.client.util.RequestConverters; @@ -91,6 +92,10 @@ default Function deleteByQuery() { return RequestConverters::deleteByQuery; } + default Function updateByQuery() { + return RequestConverters::updateByQuery; + } + default Function bulk() { return request -> { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index 905665394..56b30c1cb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -540,12 +540,15 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) { String endpoint = endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query"); Request request = new Request(HttpMethod.POST.name(), endpoint); - Params params = new Params(request).withRouting(updateByQueryRequest.getRouting()) - .withPipeline(updateByQueryRequest.getPipeline()).withRefresh(updateByQueryRequest.isRefresh()) - .withTimeout(updateByQueryRequest.getTimeout()) - .withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards()) - .withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond()) - .withIndicesOptions(updateByQueryRequest.indicesOptions()); + Params params = new Params(request) + .withRouting(updateByQueryRequest.getRouting()) // + .withPipeline(updateByQueryRequest.getPipeline()) // + .withRefresh(updateByQueryRequest.isRefresh()) // + .withTimeout(updateByQueryRequest.getTimeout()) // + .withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards()) // + .withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond()) // + .withIndicesOptions(updateByQueryRequest.indicesOptions()); // + if (!updateByQueryRequest.isAbortOnVersionConflict()) { params.putParam("conflicts", "proceed"); } @@ -555,8 +558,8 @@ public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) { if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) { params.putParam("scroll", updateByQueryRequest.getScrollTime()); } - if (updateByQueryRequest.getSize() > 0) { - params.putParam("size", Integer.toString(updateByQueryRequest.getSize())); + if (updateByQueryRequest.getMaxDocs() > 0) { + params.putParam("max_docs", Integer.toString(updateByQueryRequest.getMaxDocs())); } request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE)); return request; diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java index 7211e95fc..bcb45df7f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java @@ -18,6 +18,7 @@ import java.util.List; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.DeleteQuery; import org.springframework.data.elasticsearch.core.query.GetQuery; @@ -33,6 +34,7 @@ * Elasticsearch Document APIs. * * @author Peter-Josef Meisch + * @author Farid Faoudi * @since 4.0 */ public interface DocumentOperations { @@ -299,6 +301,16 @@ default void bulkUpdate(List queries, IndexCoordinates index) { */ UpdateResponse update(UpdateQuery updateQuery, IndexCoordinates index); + /** + * Update document(s) by query + * + * @param updateQuery query defining the update + * @param index the index where to update the records + * @return the update response + * @since 4.2 + */ + UpdateByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); + // region deprecated /** * Delete all records matching the query. diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 187ae5251..7ef335ce3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -37,7 +37,9 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.suggest.SuggestBuilder; import org.slf4j.Logger; @@ -46,6 +48,7 @@ import org.springframework.data.elasticsearch.core.document.DocumentAdapters; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.DeleteQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery; @@ -87,6 +90,7 @@ * @author Mathias Teier * @author Gyula Attila Csorogi * @author Massimiliano Poggi + * @author Farid Faoudi */ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { @@ -226,6 +230,13 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) { return new UpdateResponse(result); } + @Override + public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) { + final UpdateByQueryRequest updateByQueryRequest = requestFactory.updateByQueryRequest(query, index); + final BulkByScrollResponse bulkByScrollResponse = execute(client -> client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT)); + return UpdateByQueryResponse.of(bulkByScrollResponse); + } + public List doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index)); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index 836b3b008..f6653f2e2 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -36,6 +36,8 @@ import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder; import org.elasticsearch.search.suggest.SuggestBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +46,7 @@ import org.springframework.data.elasticsearch.core.document.DocumentAdapters; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.DeleteQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery; @@ -81,6 +84,7 @@ * @author Farid Azaza * @author Gyula Attila Csorogi * @author Roman Puchkovskiy + * @author Farid Faoudi * @deprecated as of 4.0 */ @Deprecated @@ -251,6 +255,13 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) { return new UpdateResponse(result); } + @Override + public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) { + final UpdateByQueryRequestBuilder updateByQueryRequestBuilder = requestFactory.updateByQueryRequestBuilder(client, query, index); + final BulkByScrollResponse bulkByScrollResponse = updateByQueryRequestBuilder.execute().actionGet(); + return UpdateByQueryResponse.of(bulkByScrollResponse); + } + public List doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { BulkRequestBuilder bulkRequestBuilder = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 251d18c29..a79ecd217 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -15,6 +15,7 @@ */ package org.springframework.data.elasticsearch.core; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -36,6 +37,7 @@ * @author Peter-Josef Meisch * @author Aleksei Arsenev * @author Roman Puchkovskiy + * @author Farid Faoudi * @since 4.0 */ public interface ReactiveDocumentOperations { @@ -336,4 +338,13 @@ default Mono delete(String id, Class entityType, IndexCoordinates ind * @since 4.1 */ Mono update(UpdateQuery updateQuery, IndexCoordinates index); + + /** + * Update document(s) by query. + * @param updateQuery query defining the update + * @param index the index where to update the records + * @return a {@link Mono} emitting the update response + * @since 4.2 + */ + Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index ba993ac13..ac52f0156 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -15,6 +15,8 @@ */ package org.springframework.data.elasticsearch.core; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -97,6 +99,7 @@ * @author Roman Puchkovskiy * @author Russell Parry * @author Thomas Geese + * @author Farid Faoudi * @since 3.2 */ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware { @@ -558,6 +561,18 @@ public Mono update(UpdateQuery updateQuery, IndexCoordinates ind }); } + @Override + public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { + + Assert.notNull(updateQuery, "UpdateQuery must not be null"); + Assert.notNull(index, "Index must not be null"); + + return Mono.defer(() -> { + final UpdateByQueryRequest request = requestFactory.updateByQueryRequest(updateQuery, index); + return Mono.from(execute(client -> client.updateBy(request))); + }); + } + @Override public Mono delete(Query query, Class entityType) { return delete(query, entityType, getIndexCoordinatesFor(entityType)); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index 50ebb5478..00df712d3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -18,16 +18,7 @@ import static org.elasticsearch.index.query.QueryBuilders.*; import static org.springframework.util.CollectionUtils.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.alias.Alias; @@ -82,8 +73,10 @@ import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder; +import org.elasticsearch.index.reindex.UpdateByQueryAction; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder; import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; @@ -123,6 +116,7 @@ * @author Sascha Woo * @author Roman Puchkovskiy * @author Subhobrata Dey + * @author Farid Faoudi * @since 4.0 */ class RequestFactory { @@ -1404,7 +1398,7 @@ public UpdateRequest updateRequest(UpdateQuery query, IndexCoordinates index) { if (params == null) { params = new HashMap<>(); } - Script script = new Script(ScriptType.INLINE, query.getLang(), query.getScript(), params); + Script script = new Script(getScriptType(query.getScriptType()), query.getLang(), query.getScript(), params); updateRequest.script(script); } @@ -1478,7 +1472,7 @@ public UpdateRequestBuilder updateRequestBuilderFor(Client client, UpdateQuery q if (params == null) { params = new HashMap<>(); } - Script script = new Script(ScriptType.INLINE, query.getLang(), query.getScript(), params); + Script script = new Script(getScriptType(query.getScriptType()), query.getLang(), query.getScript(), params); updateRequestBuilder.setScript(script); } @@ -1540,6 +1534,147 @@ public UpdateRequestBuilder updateRequestBuilderFor(Client client, UpdateQuery q return updateRequestBuilder; } + + public UpdateByQueryRequest updateByQueryRequest(UpdateQuery query, IndexCoordinates index) { + String indexName = index.getIndexName(); + final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); + + updateByQueryRequest.setScript(getScript(query)); + + if (query.getAbortOnVersionConflict() != null) { + updateByQueryRequest.setAbortOnVersionConflict(query.getAbortOnVersionConflict()); + } + + if (query.getBatchSize() != null) { + updateByQueryRequest.setBatchSize(query.getBatchSize()); + } + + if (query.getQuery() != null) { + final Query queryQuery = query.getQuery(); + + updateByQueryRequest.setQuery(getQuery(queryQuery)); + + if (queryQuery.getIndicesOptions() != null) { + updateByQueryRequest.setIndicesOptions(queryQuery.getIndicesOptions()); + } + + if(queryQuery.getScrollTime() != null) { + updateByQueryRequest.setScroll(TimeValue.timeValueMillis(queryQuery.getScrollTime().toMillis())); + } + } + + if (query.getMaxDocs() != null) { + updateByQueryRequest.setMaxDocs(query.getMaxDocs()); + } + + if (query.getMaxRetries() != null) { + updateByQueryRequest.setMaxRetries(query.getMaxRetries()); + } + + if (query.getPipeline() != null) { + updateByQueryRequest.setPipeline(query.getPipeline()); + } + + if (query.getRefresh() != null) { + updateByQueryRequest.setRefresh(Boolean.getBoolean(query.getRefresh().name().toLowerCase())); + } + + if (query.getRequestsPerSecond() != null) { + updateByQueryRequest.setRequestsPerSecond(query.getRequestsPerSecond()); + } + + if (query.getRouting() != null) { + updateByQueryRequest.setRouting(query.getRouting()); + } + + if (query.getShouldStoreResult() != null) { + updateByQueryRequest.setShouldStoreResult(query.getShouldStoreResult()); + } + + if (query.getSlices() != null) { + updateByQueryRequest.setSlices(query.getSlices()); + } + + if (query.getTimeout() != null) { + updateByQueryRequest.setTimeout(query.getTimeout()); + } + + if (query.getWaitForActiveShards() != null) { + updateByQueryRequest.setWaitForActiveShards(ActiveShardCount.parseString(query.getWaitForActiveShards())); + } + + return updateByQueryRequest; + } + + public UpdateByQueryRequestBuilder updateByQueryRequestBuilder(Client client, UpdateQuery query, IndexCoordinates index) { + String indexName = index.getIndexName(); + + final UpdateByQueryRequestBuilder updateByQueryRequestBuilder = new UpdateByQueryRequestBuilder(client, UpdateByQueryAction.INSTANCE); + + updateByQueryRequestBuilder.source(indexName); + + updateByQueryRequestBuilder.script(getScript(query)); + + if (query.getAbortOnVersionConflict() != null) { + updateByQueryRequestBuilder.abortOnVersionConflict(query.getAbortOnVersionConflict()); + } + + if (query.getBatchSize() != null) { + updateByQueryRequestBuilder.source().setSize(query.getBatchSize()); + } + + if (query.getQuery() != null) { + final Query queryQuery = query.getQuery(); + + updateByQueryRequestBuilder.filter(getQuery(queryQuery)); + + if (queryQuery.getIndicesOptions() != null) { + updateByQueryRequestBuilder.source().setIndicesOptions(queryQuery.getIndicesOptions()); + } + + if(queryQuery.getScrollTime() != null) { + updateByQueryRequestBuilder.source().setScroll(TimeValue.timeValueMillis(queryQuery.getScrollTime().toMillis())); + } + } + + if (query.getMaxDocs() != null) { + updateByQueryRequestBuilder.maxDocs(query.getMaxDocs()); + } + + if (query.getMaxRetries() != null) { + updateByQueryRequestBuilder.setMaxRetries(query.getMaxRetries()); + } + + if (query.getPipeline() != null) { + updateByQueryRequestBuilder.setPipeline(query.getPipeline()); + } + + if (query.getRefresh() != null) { + updateByQueryRequestBuilder.refresh(Boolean.getBoolean(query.getRefresh().name().toLowerCase())); + } + + if (query.getRequestsPerSecond() != null) { + updateByQueryRequestBuilder.setRequestsPerSecond(query.getRequestsPerSecond()); + } + + if (query.getRouting() != null) { + updateByQueryRequestBuilder.source().setRouting(query.getRouting()); + } + + if (query.getShouldStoreResult() != null) { + updateByQueryRequestBuilder.setShouldStoreResult(query.getShouldStoreResult()); + } + + if (query.getSlices() != null) { + updateByQueryRequestBuilder.setSlices(query.getSlices()); + } + + if (query.getTimeout() != null) { + updateByQueryRequestBuilder.source().setTimeout(TimeValue.parseTimeValue(query.getTimeout(), getClass().getSimpleName() + ".timeout")); + } + + return updateByQueryRequestBuilder; + } // endregion // region helper functions @@ -1662,6 +1797,31 @@ private boolean hasSeqNoPrimaryTermProperty(@Nullable Class entityClass) { return entity.hasSeqNoPrimaryTermProperty(); } + private org.elasticsearch.script.ScriptType getScriptType(ScriptType scriptType) { + if (scriptType == null || ScriptType.INLINE.equals(scriptType)) { + return org.elasticsearch.script.ScriptType.INLINE; + } else { + return org.elasticsearch.script.ScriptType.STORED; + } + } + + @Nullable + private Script getScript(UpdateQuery query) { + if (ScriptType.STORED.equals(query.getScriptType()) && query.getScriptName() != null) { + final Map params = Optional.ofNullable(query.getParams()) + .orElse(new HashMap<>()); + return new Script(getScriptType(ScriptType.STORED), null, query.getScriptName(), params); + } + + if (ScriptType.INLINE.equals(query.getScriptType()) && query.getScript() != null){ + final Map params = Optional.ofNullable(query.getParams()) + .orElse(new HashMap<>()); + return new Script(getScriptType(ScriptType.INLINE), query.getLang(), query.getScript(), params); + } + + return null; + } + // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ScriptType.java b/src/main/java/org/springframework/data/elasticsearch/core/ScriptType.java new file mode 100644 index 000000000..b0bf11d56 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/ScriptType.java @@ -0,0 +1,27 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +/** + * Enum mirroring org.elasticsearch.script.ScriptType to keep Elasticsearch classes out of our API. + * + * @author Farid Faoudi + * @since 4.2 + */ + +public enum ScriptType { + INLINE, STORED +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateByQueryResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateByQueryResponse.java new file mode 100644 index 000000000..b2e629329 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateByQueryResponse.java @@ -0,0 +1,404 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.query; + +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.springframework.lang.Nullable; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Class mirroring org.elasticsearch.index.reindex.BulkByScrollResponse to keep Elasticsearch classes out of our API. + * + * @author Farid Faoudi + * @since 4.2 + */ +public class UpdateByQueryResponse { + + private final long took; + private final boolean timedOut; + private final long total; + private final long updated; + private final long deleted; + private final int batches; + private final long versionConflicts; + private final long noops; + private final long bulkRetries; + private final long searchRetries; + @Nullable private final String reasonCancelled; + private final List failures; + + private UpdateByQueryResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, + long versionConflicts, long noops, long bulkRetries, long searchRetries, + @Nullable String reasonCancelled, List failures) { + this.took = took; + this.timedOut = timedOut; + this.total = total; + this.updated = updated; + this.deleted = deleted; + this.batches = batches; + this.versionConflicts = versionConflicts; + this.noops = noops; + this.bulkRetries = bulkRetries; + this.searchRetries = searchRetries; + this.reasonCancelled = reasonCancelled; + this.failures = failures; + } + + /** + * The number of milliseconds from start to end of the whole operation. + */ + public long getTook() { + return took; + } + + /** + * Did any of the sub-requests that were part of this request timeout? + */ + public boolean getTimedOut() { + return timedOut; + } + + /** + * The number of documents that were successfully processed. + */ + public long getTotal() { + return total; + } + + /** + * The number of documents that were successfully updated. + */ + public long getUpdated() { + return updated; + } + + /** + * The number of documents that were successfully deleted. + */ + public long getDeleted() { + return deleted; + } + + /** + * The number of scroll responses pulled back by the update by query. + */ + public int getBatches() { + return batches; + } + + /** + * The number of version conflicts that the update by query hit. + */ + public long getVersionConflicts() { + return versionConflicts; + } + + /** + * The number of documents that were ignored because the script used for the update by query returned a noop value for ctx.op. + */ + public long getNoops() { + return noops; + } + + /** + * The number of times that the request had retry bulk actions. + */ + public long getBulkRetries() { + return bulkRetries; + } + + /** + * The number of times that the request had retry search actions. + */ + public long getSearchRetries() { + return searchRetries; + } + + /** + * The reason that the request was canceled or null if it hasn't been. + */ + @Nullable + public String getReasonCancelled() { + return reasonCancelled; + } + + /** + * All of the bulk failures. Version conflicts are only included if the request sets abortOnVersionConflict to true (the default). + */ + public List getFailures() { + return failures; + } + + /** + * Create a new {@link UpdateByQueryResponseBuilder} to build {@link UpdateByQueryResponse} + * + * @return a new {@link UpdateByQueryResponseBuilder} to build {@link UpdateByQueryResponse} + */ + public static UpdateByQueryResponseBuilder builder() { + return new UpdateByQueryResponseBuilder(); + } + + public static UpdateByQueryResponse of(BulkByScrollResponse bulkByScrollResponse) { + final List failures = bulkByScrollResponse.getBulkFailures() + .stream() + .map(Failure::of) + .collect(Collectors.toList()); + + return UpdateByQueryResponse.builder() + .withTook(bulkByScrollResponse.getTook().getMillis()) + .withTimedOut(bulkByScrollResponse.isTimedOut()) + .withTotal(bulkByScrollResponse.getTotal()) + .withUpdated(bulkByScrollResponse.getUpdated()) + .withDeleted(bulkByScrollResponse.getDeleted()) + .withBatches(bulkByScrollResponse.getBatches()) + .withVersionConflicts(bulkByScrollResponse.getVersionConflicts()) + .withNoops(bulkByScrollResponse.getNoops()) + .withBulkRetries(bulkByScrollResponse.getBulkRetries()) + .withSearchRetries(bulkByScrollResponse.getSearchRetries()) + .withReasonCancelled(bulkByScrollResponse.getReasonCancelled()) + .withFailures(failures) + .build(); + } + + public static class Failure { + + private final String index; + private final String type; + private final String id; + private final Exception cause; + private final Integer status; + private final Long seqNo; + private final Long term; + private final Boolean aborted; + + private Failure(String index, String type, String id, Exception cause, Integer status, Long seqNo, Long term, + Boolean aborted) { + this.index = index; + this.type = type; + this.id = id; + this.cause = cause; + this.status = status; + this.seqNo = seqNo; + this.term = term; + this.aborted = aborted; + } + + public String getIndex() { + return index; + } + + public String getType() { + return type; + } + + public String getId() { + return id; + } + + public Exception getCause() { + return cause; + } + + public Integer getStatus() { + return status; + } + + public Long getSeqNo() { + return seqNo; + } + + public Long getTerm() { + return term; + } + + public Boolean getAborted() { + return aborted; + } + + /** + * Create a new {@link FailureBuilder} to build {@link Failure} + * + * @return a new {@link FailureBuilder} to build {@link Failure} + */ + public static FailureBuilder builder() { + return new FailureBuilder(); + } + + /** + * Create a new {@link Failure} from {@link BulkItemResponse.Failure} + * @param failure {@link BulkItemResponse.Failure} to translate + * @return a new {@link Failure} + */ + public static Failure of(BulkItemResponse.Failure failure) { + return builder() + .withIndex(failure.getIndex()) + .withType(failure.getType()) + .withId(failure.getId()) + .withStatus(failure.getStatus().getStatus()) + .withAborted(failure.isAborted()) + .withCause(failure.getCause()) + .withSeqNo(failure.getSeqNo()) + .withTerm(failure.getTerm()) + .build(); + } + + /** + * Builder for {@link Failure} + */ + public static final class FailureBuilder { + private String index; + private String type; + private String id; + private Exception cause; + private Integer status; + private Long seqNo; + private Long term; + private Boolean aborted; + + private FailureBuilder() { + } + + public FailureBuilder withIndex(String index) { + this.index = index; + return this; + } + + public FailureBuilder withType(String type) { + this.type = type; + return this; + } + + public FailureBuilder withId(String id) { + this.id = id; + return this; + } + + public FailureBuilder withCause(Exception cause) { + this.cause = cause; + return this; + } + + public FailureBuilder withStatus(Integer status) { + this.status = status; + return this; + } + + public FailureBuilder withSeqNo(Long seqNo) { + this.seqNo = seqNo; + return this; + } + + public FailureBuilder withTerm(Long term) { + this.term = term; + return this; + } + + public FailureBuilder withAborted(Boolean aborted) { + this.aborted = aborted; + return this; + } + + public Failure build() { + return new Failure(index, type, id, cause, status, seqNo, term, aborted); + } + } + } + + public static final class UpdateByQueryResponseBuilder { + private long took; + private boolean timedOut; + private long total; + private long updated; + private long deleted; + private int batches; + private long versionConflicts; + private long noops; + private long bulkRetries; + private long searchRetries; + @Nullable private String reasonCancelled; + private List failures; + + private UpdateByQueryResponseBuilder() { + } + + public UpdateByQueryResponseBuilder withTook(long took) { + this.took = took; + return this; + } + + public UpdateByQueryResponseBuilder withTimedOut(boolean timedOut) { + this.timedOut = timedOut; + return this; + } + + public UpdateByQueryResponseBuilder withTotal(long total) { + this.total = total; + return this; + } + + public UpdateByQueryResponseBuilder withUpdated(long updated) { + this.updated = updated; + return this; + } + + public UpdateByQueryResponseBuilder withDeleted(long deleted) { + this.deleted = deleted; + return this; + } + + public UpdateByQueryResponseBuilder withBatches(int batches) { + this.batches = batches; + return this; + } + + public UpdateByQueryResponseBuilder withVersionConflicts(long versionConflicts) { + this.versionConflicts = versionConflicts; + return this; + } + + public UpdateByQueryResponseBuilder withNoops(long noops) { + this.noops = noops; + return this; + } + + public UpdateByQueryResponseBuilder withBulkRetries(long bulkRetries) { + this.bulkRetries = bulkRetries; + return this; + } + + public UpdateByQueryResponseBuilder withSearchRetries(long searchRetries) { + this.searchRetries = searchRetries; + return this; + } + + public UpdateByQueryResponseBuilder withReasonCancelled(String reasonCancelled) { + this.reasonCancelled = reasonCancelled; + return this; + } + + public UpdateByQueryResponseBuilder withFailures(List failures) { + this.failures = failures; + return this; + } + + public UpdateByQueryResponse build() { + return new UpdateByQueryResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, bulkRetries, searchRetries, reasonCancelled, failures); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateQuery.java index 72c9d6d98..be859bce6 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateQuery.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; +import org.springframework.data.elasticsearch.core.ScriptType; import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.lang.Nullable; @@ -27,6 +28,7 @@ * @author Rizwan Idrees * @author Mohsin Husen * @author Peter-Josef Meisch + * @author Farid Faoudi * @see DOC_SOURCE; - @Autowired ReactiveElasticsearchClient client; - @Autowired ReactiveElasticsearchOperations operations; + @Autowired + ReactiveElasticsearchClient client; + @Autowired + ReactiveElasticsearchOperations operations; static { @@ -451,6 +458,76 @@ public void deleteByEmitResultWhenNothingRemoved() { .verifyComplete(); } + @Test // #1446 + void updateByEmitResultWhenNothingUpdated() { + addSourceDocument().to(INDEX_I); + addSourceDocument().to(INDEX_I); + + Map source = new LinkedHashMap<>(); + source.put("firstname", "crow"); + source.put("lastname", "cat"); + + final Map documentToNotUpdate = Collections.unmodifiableMap(source); + add(documentToNotUpdate).to(INDEX_I); + + final String script = "ctx._source['firstname'] = params['newFirstname']"; + final Map params = Collections.singletonMap("newFirstname", "arrow"); + + final UpdateByQueryRequest request = new UpdateByQueryRequest(INDEX_I) + .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("lastname", "fallstar"))) + .setAbortOnVersionConflict(true) + .setRefresh(true) + .setScript(new Script(ScriptType.INLINE, "painless", script, params)); + + client.updateBy(request) + .map(UpdateByQueryResponse::getUpdated) + .as(StepVerifier::create) + .expectNext(2L) + .verifyComplete(); + + final SearchRequest searchUpdatedRequest = new SearchRequest(INDEX_I) // + .source(new SearchSourceBuilder() + .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("firstname", "arrow")))); + + client.search(searchUpdatedRequest) + .collectList() + .map(List::size) + .as(StepVerifier::create) + .expectNext(2) + .verifyComplete(); + } + + @Test // #1446 + void updateByShouldUpdateExistingDocument() { + addSourceDocument().to(INDEX_I); + + final String script = "ctx._source['firstname'] = params['newFirstname']"; + final Map params = Collections.singletonMap("newFirstname", "arrow"); + + final UpdateByQueryRequest request = new UpdateByQueryRequest(INDEX_I) + .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("lastname", "non_existing_lastname"))) + .setAbortOnVersionConflict(true) + .setRefresh(true) + .setScript(new Script(ScriptType.INLINE, "painless", script, params)); + + client.updateBy(request) + .map(UpdateByQueryResponse::getUpdated) + .as(StepVerifier::create) + .expectNext(0L) + .verifyComplete(); + + SearchRequest searchUpdatedRequest = new SearchRequest(INDEX_I) // + .source(new SearchSourceBuilder() + .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("firstname", "arrow")))); + + client.search(searchUpdatedRequest) + .collectList() + .map(List::size) + .as(StepVerifier::create) + .expectNext(0) + .verifyComplete(); + } + @Test // DATAES-510 public void scrollShouldReadWhileEndNotReached() { diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplateTests.java index 8e8929e94..285f824c7 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplateTests.java @@ -16,6 +16,8 @@ package org.springframework.data.elasticsearch.core; import static org.assertj.core.api.Assertions.*; +import static org.elasticsearch.index.query.QueryBuilders.*; +import static org.skyscreamer.jsonassert.JSONAssert.*; import static org.springframework.data.elasticsearch.annotations.FieldType.*; import static org.springframework.data.elasticsearch.utils.IdGenerator.*; @@ -24,21 +26,28 @@ import lombok.val; import java.lang.Object; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.json.JSONException; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.skyscreamer.jsonassert.JSONAssert; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; +import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration; import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; @@ -57,6 +66,7 @@ * @author Sascha Woo * @author Don Wellington * @author Peter-Josef Meisch + * @author Farid Faoudi */ @ContextConfiguration(classes = { ElasticsearchRestTemplateConfiguration.class }) @DisplayName("ElasticsearchRestTemplate") @@ -119,4 +129,55 @@ void shouldUseAllOptionsFromUpdateQuery() { assertThat(fetchSourceContext.includes()).containsExactlyInAnyOrder("incl"); assertThat(fetchSourceContext.excludes()).containsExactlyInAnyOrder("excl"); } + + @Test // #1446 + void shouldUseAllOptionsFromUpdateByQuery() throws JSONException { + // given + NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()) + .withIndicesOptions(IndicesOptions.lenientExpandOpen()) + .build(); + searchQuery.setScrollTime(Duration.ofMillis(1000)); + + final UpdateQuery updateQuery = UpdateQuery.builder(searchQuery) + .withAbortOnVersionConflict(true) + .withBatchSize(10) + .withMaxDocs(12) + .withMaxRetries(3) + .withPipeline("pipeline") + .withRequestsPerSecond(5F) + .withShouldStoreResult(false) + .withSlices(4) + .withScriptType(ScriptType.INLINE) + .withScript("script") + .withLang("painless") + .build(); + + final String expectedSearchRequest = '{' + // + " \"size\": 10," + // + " \"query\": {" + // + " \"match_all\": {" + // + " \"boost\": 1.0" + // + " }" + + " }" + + '}'; + + // when + final UpdateByQueryRequest request = getRequestFactory().updateByQueryRequest(updateQuery, IndexCoordinates.of("index")); + + // then + assertThat(request).isNotNull(); + assertThat(request.getSearchRequest().indicesOptions()).usingRecursiveComparison().isEqualTo(IndicesOptions.lenientExpandOpen()); + assertThat(request.getScrollTime().getMillis()).isEqualTo(1000); + assertEquals(request.getSearchRequest().source().toString(), expectedSearchRequest, false); + assertThat(request.isAbortOnVersionConflict()).isTrue(); + assertThat(request.getBatchSize()).isEqualTo(10); + assertThat(request.getMaxDocs()).isEqualTo(12); + assertThat(request.getPipeline()).isEqualTo("pipeline"); + assertThat(request.getRequestsPerSecond()).isEqualTo(5F); + assertThat(request.getShouldStoreResult()).isFalse(); + assertThat(request.getSlices()).isEqualTo(4); + assertThat(request.getScript().getIdOrCode()).isEqualTo("script"); + assertThat(request.getScript().getType()).isEqualTo(org.elasticsearch.script.ScriptType.INLINE); + assertThat(request.getScript().getLang()).isEqualTo("painless"); + } } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index b2c4955d7..cda5f5a8a 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -33,15 +33,7 @@ import java.lang.Integer; import java.lang.Long; import java.lang.Object; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -119,6 +111,7 @@ * @author Gyula Attila Csorogi * @author Roman Puchkovskiy * @author Subhobrata Dey + * @author Farid Faoudi */ @SpringIntegrationTest public abstract class ElasticsearchTemplateTests { @@ -1535,6 +1528,39 @@ public void shouldDoPartialUpdateForExistingDocument() { assertThat(indexedEntity.getMessage()).isEqualTo(messageAfterUpdate); } + @Test + void shouldDoUpdateByQueryForExistingDocument() { + // given + final String documentId = nextIdAsString(); + final String messageBeforeUpdate = "some test message"; + final String messageAfterUpdate = "test message"; + + final SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message(messageBeforeUpdate) + .version(System.currentTimeMillis()).build(); + + final IndexQuery indexQuery = getIndexQuery(sampleEntity); + + operations.index(indexQuery, index); + indexOperations.refresh(); + + final NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); + + final UpdateQuery updateQuery = UpdateQuery.builder(query) + .withScriptType(org.springframework.data.elasticsearch.core.ScriptType.INLINE) + .withScript("ctx._source['message'] = params['newMessage']") + .withLang("painless") + .withParams(Collections.singletonMap("newMessage", messageAfterUpdate)) + .withAbortOnVersionConflict(true) + .build(); + + // when + operations.updateByQuery(updateQuery, index); + + // then + SampleEntity indexedEntity = operations.get(documentId, SampleEntity.class, index); + assertThat(indexedEntity.getMessage()).isEqualTo(messageAfterUpdate); + } + @Test // DATAES-227 public void shouldUseUpsertOnUpdate() { diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTransportTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTransportTemplateTests.java index 1636105ab..157ed580e 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTransportTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTransportTemplateTests.java @@ -16,6 +16,8 @@ package org.springframework.data.elasticsearch.core; import static org.assertj.core.api.Assertions.*; +import static org.elasticsearch.index.query.QueryBuilders.*; +import static org.skyscreamer.jsonassert.JSONAssert.*; import static org.springframework.data.elasticsearch.annotations.FieldType.*; import static org.springframework.data.elasticsearch.utils.IdGenerator.*; @@ -23,17 +25,21 @@ import lombok.val; import java.lang.Object; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder; +import org.json.JSONException; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -53,6 +59,7 @@ /** * @author Peter-Josef Meisch * @author Sascha Woo + * @author Farid Faoudi */ @ContextConfiguration(classes = { ElasticsearchTemplateConfiguration.class }) @DisplayName("ElasticsearchTransportTemplate") @@ -138,6 +145,55 @@ void shouldProvideClient() { assertThat(client).isNotNull(); } + @Test // #1446 + void shouldUseAllOptionsFromUpdateByQuery() throws JSONException { + // given + NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()) + .withIndicesOptions(IndicesOptions.lenientExpandOpen()) + .build(); + searchQuery.setScrollTime(Duration.ofMillis(1000)); + + final UpdateQuery updateQuery = UpdateQuery.builder(searchQuery) + .withAbortOnVersionConflict(true) + .withBatchSize(10) + .withMaxDocs(12) + .withMaxRetries(3) + .withPipeline("pipeline") + .withRequestsPerSecond(5F) + .withShouldStoreResult(false) + .withSlices(4) + .withScriptType(ScriptType.STORED) + .withScriptName("script_name") + .build(); + + final String expectedSearchRequest = '{' + // + " \"size\": 10," + // + " \"query\": {" + // + " \"match_all\": {" + // + " \"boost\": 1.0" + // + " }" + + " }" + + '}'; + + // when + final UpdateByQueryRequestBuilder request = getRequestFactory().updateByQueryRequestBuilder(client, updateQuery, IndexCoordinates.of("index")); + + // then + assertThat(request).isNotNull(); + assertThat(request.request().getSearchRequest().indicesOptions()).usingRecursiveComparison().isEqualTo(IndicesOptions.lenientExpandOpen()); + assertThat(request.request().getScrollTime().getMillis()).isEqualTo(1000); + assertEquals(request.request().getSearchRequest().source().toString(), expectedSearchRequest, false); + assertThat(request.request().isAbortOnVersionConflict()).isTrue(); + assertThat(request.request().getBatchSize()).isEqualTo(10); + assertThat(request.request().getMaxDocs()).isEqualTo(12); + assertThat(request.request().getPipeline()).isEqualTo("pipeline"); + assertThat(request.request().getRequestsPerSecond()).isEqualTo(5F); + assertThat(request.request().getShouldStoreResult()).isFalse(); + assertThat(request.request().getSlices()).isEqualTo(4); + assertThat(request.request().getScript().getIdOrCode()).isEqualTo("script_name"); + assertThat(request.request().getScript().getType()).isEqualTo(org.elasticsearch.script.ScriptType.STORED); + } + @Data @Document(indexName = "test-index-sample-core-transport-template", replicas = 0, refreshInterval = "-1") static class SampleEntity {