Skip to content

Commit 30ad335

Browse files
committed
Implement update by query
1 parent 3c6dd64 commit 30ad335

17 files changed

+1062
-32
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.netty.handler.timeout.WriteTimeoutHandler;
2525
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
2626
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
27+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
2728
import reactor.core.publisher.Flux;
2829
import reactor.core.publisher.Mono;
2930
import reactor.netty.http.client.HttpClient;
@@ -521,6 +522,13 @@ public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryReq
521522
.next();
522523
}
523524

525+
@Override
526+
public Mono<org.springframework.data.elasticsearch.core.query.BulkByScrollResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) {
527+
return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers) //
528+
.next() //
529+
.map(org.springframework.data.elasticsearch.core.query.BulkByScrollResponse::translateFrom);
530+
}
531+
524532
/*
525533
* (non-Javadoc)
526534
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)

src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
1919
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
20+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
2021
import reactor.core.publisher.Flux;
2122
import reactor.core.publisher.Mono;
2223

@@ -596,6 +597,44 @@ default Mono<BulkByScrollResponse> deleteBy(DeleteByQueryRequest deleteRequest)
596597
*/
597598
Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest);
598599

600+
/**
601+
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
602+
*
603+
* @param consumer never {@literal null}.
604+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
605+
* * Query API on elastic.co</a>
606+
* @return a {@link Mono} emitting operation response.
607+
*/
608+
default Mono<org.springframework.data.elasticsearch.core.query.BulkByScrollResponse> updateBy(Consumer<UpdateByQueryRequest> consumer){
609+
610+
final UpdateByQueryRequest request = new UpdateByQueryRequest();
611+
consumer.accept(request);
612+
return updateBy(request);
613+
}
614+
615+
/**
616+
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
617+
*
618+
* @param updateRequest must not be {@literal null}.
619+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
620+
* * Query API on elastic.co</a>
621+
* @return a {@link Mono} emitting operation response.
622+
*/
623+
default Mono<org.springframework.data.elasticsearch.core.query.BulkByScrollResponse> updateBy(UpdateByQueryRequest updateRequest){
624+
return updateBy(HttpHeaders.EMPTY, updateRequest);
625+
}
626+
627+
/**
628+
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
629+
*
630+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
631+
* @param updateRequest must not be {@literal null}.
632+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
633+
* * Query API on elastic.co</a>
634+
* @return a {@link Mono} emitting operation response.
635+
*/
636+
Mono<org.springframework.data.elasticsearch.core.query.BulkByScrollResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest);
637+
599638
/**
600639
* Execute a {@link BulkRequest} against the {@literal bulk} API.
601640
*

src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
3434
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
3535
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
36+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
3637
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
3738
import org.springframework.data.elasticsearch.client.util.RequestConverters;
3839

@@ -91,6 +92,10 @@ default Function<DeleteByQueryRequest, Request> deleteByQuery() {
9192
return RequestConverters::deleteByQuery;
9293
}
9394

95+
default Function<UpdateByQueryRequest, Request> updateByQuery() {
96+
return RequestConverters::updateByQuery;
97+
}
98+
9499
default Function<BulkRequest, Request> bulk() {
95100

96101
return request -> {

src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -540,8 +540,10 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
540540
public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) {
541541
String endpoint = endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query");
542542
Request request = new Request(HttpMethod.POST.name(), endpoint);
543-
Params params = new Params(request).withRouting(updateByQueryRequest.getRouting())
544-
.withPipeline(updateByQueryRequest.getPipeline()).withRefresh(updateByQueryRequest.isRefresh())
543+
Params params = new Params(request)
544+
.withRouting(updateByQueryRequest.getRouting())
545+
.withPipeline(updateByQueryRequest.getPipeline())
546+
.withRefresh(updateByQueryRequest.isRefresh())
545547
.withTimeout(updateByQueryRequest.getTimeout())
546548
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards())
547549
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond())
@@ -555,8 +557,8 @@ public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) {
555557
if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
556558
params.putParam("scroll", updateByQueryRequest.getScrollTime());
557559
}
558-
if (updateByQueryRequest.getSize() > 0) {
559-
params.putParam("size", Integer.toString(updateByQueryRequest.getSize()));
560+
if (updateByQueryRequest.getMaxDocs() > 0) {
561+
params.putParam("max_docs", Integer.toString(updateByQueryRequest.getMaxDocs()));
560562
}
561563
request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
562564
return request;

src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.List;
1919

2020
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
21+
import org.springframework.data.elasticsearch.core.query.BulkByScrollResponse;
2122
import org.springframework.data.elasticsearch.core.query.BulkOptions;
2223
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
2324
import org.springframework.data.elasticsearch.core.query.GetQuery;
@@ -32,6 +33,7 @@
3233
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html">Elasticsearch Document APIs</a>.
3334
*
3435
* @author Peter-Josef Meisch
36+
* @author Farid Faoudi
3537
* @since 4.0
3638
*/
3739
public interface DocumentOperations {
@@ -297,6 +299,16 @@ default String delete(String id, IndexCoordinates index) {
297299
*/
298300
UpdateResponse update(UpdateQuery updateQuery, IndexCoordinates index);
299301

302+
/**
303+
* Update document(s) by query
304+
*
305+
* @param updateQuery query defining the update
306+
* @param index the index where to update the records
307+
* @return the update response
308+
* @since 4.2
309+
*/
310+
BulkByScrollResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
311+
300312
// region deprecated
301313
/**
302314
* Delete all records matching the query.

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.client.RestHighLevelClient;
3939
import org.elasticsearch.common.unit.TimeValue;
4040
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
41+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
4142
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
4243
import org.elasticsearch.search.suggest.SuggestBuilder;
4344
import org.slf4j.Logger;
@@ -46,6 +47,7 @@
4647
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
4748
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
4849
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
50+
import org.springframework.data.elasticsearch.core.query.BulkByScrollResponse;
4951
import org.springframework.data.elasticsearch.core.query.BulkOptions;
5052
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
5153
import org.springframework.data.elasticsearch.core.query.IndexQuery;
@@ -87,6 +89,7 @@
8789
* @author Mathias Teier
8890
* @author Gyula Attila Csorogi
8991
* @author Massimiliano Poggi
92+
* @author Farid Faoudi
9093
*/
9194
public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
9295

@@ -223,6 +226,13 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
223226
return new UpdateResponse(result);
224227
}
225228

229+
@Override
230+
public BulkByScrollResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
231+
final UpdateByQueryRequest updateByQueryRequest = requestFactory.updateByQueryRequest(query, index);
232+
final org.elasticsearch.index.reindex.BulkByScrollResponse bulkByScrollResponse = execute(client -> client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT));
233+
return BulkByScrollResponse.translateFrom(bulkByScrollResponse);
234+
}
235+
226236
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
227237
IndexCoordinates index) {
228238
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.action.update.UpdateRequestBuilder;
3737
import org.elasticsearch.client.Client;
3838
import org.elasticsearch.common.unit.TimeValue;
39+
import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
3940
import org.elasticsearch.search.suggest.SuggestBuilder;
4041
import org.slf4j.Logger;
4142
import org.slf4j.LoggerFactory;
@@ -44,6 +45,7 @@
4445
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
4546
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
4647
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
48+
import org.springframework.data.elasticsearch.core.query.BulkByScrollResponse;
4749
import org.springframework.data.elasticsearch.core.query.BulkOptions;
4850
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
4951
import org.springframework.data.elasticsearch.core.query.IndexQuery;
@@ -81,6 +83,7 @@
8183
* @author Farid Azaza
8284
* @author Gyula Attila Csorogi
8385
* @author Roman Puchkovskiy
86+
* @author Farid Faoudi
8487
* @deprecated as of 4.0
8588
*/
8689
@Deprecated
@@ -241,6 +244,13 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
241244
return new UpdateResponse(result);
242245
}
243246

247+
@Override
248+
public BulkByScrollResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
249+
final UpdateByQueryRequestBuilder updateByQueryRequestBuilder = requestFactory.updateByQueryRequestBuilderFor(client, query, index);
250+
final org.elasticsearch.index.reindex.BulkByScrollResponse bulkByScrollResponse = updateByQueryRequestBuilder.execute().actionGet();
251+
return BulkByScrollResponse.translateFrom(bulkByScrollResponse);
252+
}
253+
244254
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
245255
IndexCoordinates index) {
246256
BulkRequestBuilder bulkRequestBuilder = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index);

src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.elasticsearch.core;
1717

18+
import org.springframework.data.elasticsearch.core.query.BulkByScrollResponse;
1819
import reactor.core.publisher.Flux;
1920
import reactor.core.publisher.Mono;
2021

@@ -36,6 +37,7 @@
3637
* @author Peter-Josef Meisch
3738
* @author Aleksei Arsenev
3839
* @author Roman Puchkovskiy
40+
* @author Farid Faoudi
3941
* @since 4.0
4042
*/
4143
public interface ReactiveDocumentOperations {
@@ -336,4 +338,13 @@ default Mono<String> delete(String id, Class<?> entityType, IndexCoordinates ind
336338
* @since 4.1
337339
*/
338340
Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index);
341+
342+
/**
343+
* Update document(s) by query.
344+
* @param updateQuery query defining the update
345+
* @param index the index where to update the records
346+
* @return a {@link Mono} emitting the update response
347+
* @since 4.2
348+
*/
349+
Mono<BulkByScrollResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
339350
}

src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.elasticsearch.core;
1717

18+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
1819
import reactor.core.publisher.Flux;
1920
import reactor.core.publisher.Mono;
2021
import reactor.util.function.Tuple2;
@@ -95,6 +96,7 @@
9596
* @author Roman Puchkovskiy
9697
* @author Russell Parry
9798
* @author Thomas Geese
99+
* @author Farid Faoudi
98100
* @since 3.2
99101
*/
100102
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
@@ -539,6 +541,18 @@ public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates ind
539541
});
540542
}
541543

544+
@Override
545+
public Mono<org.springframework.data.elasticsearch.core.query.BulkByScrollResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
546+
547+
Assert.notNull(updateQuery, "UpdateQuery must not be null");
548+
Assert.notNull(index, "Index must not be null");
549+
550+
return Mono.defer(() -> {
551+
final UpdateByQueryRequest request = requestFactory.updateByQueryRequest(updateQuery, index);
552+
return Mono.from(execute(client -> client.updateBy(request)));
553+
});
554+
}
555+
542556
@Override
543557
public Mono<Long> delete(Query query, Class<?> entityType) {
544558
return delete(query, entityType, getIndexCoordinatesFor(entityType));

0 commit comments

Comments
 (0)