Skip to content

Implement update by query #1644

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
Expand Down Expand Up @@ -521,6 +523,13 @@ public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryReq
.next();
}

@Override
public Mono<UpdateByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) {
return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers) //
.next() //
.map(UpdateByQueryResponse::of);
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import org.elasticsearch.client.indices.GetFieldMappingsRequest;
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -596,6 +598,44 @@ default Mono<BulkByScrollResponse> deleteBy(DeleteByQueryRequest deleteRequest)
*/
Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest);

/**
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
* * Query API on elastic.co</a>
* @return a {@link Mono} emitting operation response.
*/
default Mono<UpdateByQueryResponse> updateBy(Consumer<UpdateByQueryRequest> consumer){

final UpdateByQueryRequest request = new UpdateByQueryRequest();
consumer.accept(request);
return updateBy(request);
}

/**
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
*
* @param updateRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
* * Query API on elastic.co</a>
* @return a {@link Mono} emitting operation response.
*/
default Mono<UpdateByQueryResponse> updateBy(UpdateByQueryRequest updateRequest){
return updateBy(HttpHeaders.EMPTY, updateRequest);
}

/**
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param updateRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
* * Query API on elastic.co</a>
* @return a {@link Mono} emitting operation response.
*/
Mono<UpdateByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest);

/**
* Execute a {@link BulkRequest} against the {@literal bulk} API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.util.RequestConverters;

Expand Down Expand Up @@ -91,6 +92,10 @@ default Function<DeleteByQueryRequest, Request> deleteByQuery() {
return RequestConverters::deleteByQuery;
}

default Function<UpdateByQueryRequest, Request> updateByQuery() {
return RequestConverters::updateByQuery;
}

default Function<BulkRequest, Request> bulk() {

return request -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,15 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) {
String endpoint = endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query");
Request request = new Request(HttpMethod.POST.name(), endpoint);
Params params = new Params(request).withRouting(updateByQueryRequest.getRouting())
.withPipeline(updateByQueryRequest.getPipeline()).withRefresh(updateByQueryRequest.isRefresh())
.withTimeout(updateByQueryRequest.getTimeout())
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards())
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond())
.withIndicesOptions(updateByQueryRequest.indicesOptions());
Params params = new Params(request)
.withRouting(updateByQueryRequest.getRouting()) //
.withPipeline(updateByQueryRequest.getPipeline()) //
.withRefresh(updateByQueryRequest.isRefresh()) //
.withTimeout(updateByQueryRequest.getTimeout()) //
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards()) //
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond()) //
.withIndicesOptions(updateByQueryRequest.indicesOptions()); //

if (!updateByQueryRequest.isAbortOnVersionConflict()) {
params.putParam("conflicts", "proceed");
}
Expand All @@ -555,8 +558,8 @@ public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) {
if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
params.putParam("scroll", updateByQueryRequest.getScrollTime());
}
if (updateByQueryRequest.getSize() > 0) {
params.putParam("size", Integer.toString(updateByQueryRequest.getSize()));
if (updateByQueryRequest.getMaxDocs() > 0) {
params.putParam("max_docs", Integer.toString(updateByQueryRequest.getMaxDocs()));
}
request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;

import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.data.elasticsearch.core.query.GetQuery;
Expand All @@ -33,6 +34,7 @@
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html">Elasticsearch Document APIs</a>.
*
* @author Peter-Josef Meisch
* @author Farid Faoudi
* @since 4.0
*/
public interface DocumentOperations {
Expand Down Expand Up @@ -299,6 +301,16 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
*/
UpdateResponse update(UpdateQuery updateQuery, IndexCoordinates index);

/**
* Update document(s) by query
*
* @param updateQuery query defining the update
* @param index the index where to update the records
* @return the update response
* @since 4.2
*/
UpdateByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);

// region deprecated
/**
* Delete all records matching the query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.slf4j.Logger;
Expand All @@ -46,6 +48,7 @@
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
Expand Down Expand Up @@ -87,6 +90,7 @@
* @author Mathias Teier
* @author Gyula Attila Csorogi
* @author Massimiliano Poggi
* @author Farid Faoudi
*/
public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {

Expand Down Expand Up @@ -226,6 +230,13 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
return new UpdateResponse(result);
}

@Override
public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
final UpdateByQueryRequest updateByQueryRequest = requestFactory.updateByQueryRequest(query, index);
final BulkByScrollResponse bulkByScrollResponse = execute(client -> client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT));
return UpdateByQueryResponse.of(bulkByScrollResponse);
}

public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
IndexCoordinates index) {
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,6 +46,7 @@
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
Expand Down Expand Up @@ -81,6 +84,7 @@
* @author Farid Azaza
* @author Gyula Attila Csorogi
* @author Roman Puchkovskiy
* @author Farid Faoudi
* @deprecated as of 4.0
*/
@Deprecated
Expand Down Expand Up @@ -251,6 +255,13 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
return new UpdateResponse(result);
}

@Override
public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
final UpdateByQueryRequestBuilder updateByQueryRequestBuilder = requestFactory.updateByQueryRequestBuilder(client, query, index);
final BulkByScrollResponse bulkByScrollResponse = updateByQueryRequestBuilder.execute().actionGet();
return UpdateByQueryResponse.of(bulkByScrollResponse);
}

public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
IndexCoordinates index) {
BulkRequestBuilder bulkRequestBuilder = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.elasticsearch.core;

import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -36,6 +37,7 @@
* @author Peter-Josef Meisch
* @author Aleksei Arsenev
* @author Roman Puchkovskiy
* @author Farid Faoudi
* @since 4.0
*/
public interface ReactiveDocumentOperations {
Expand Down Expand Up @@ -336,4 +338,13 @@ default Mono<String> delete(String id, Class<?> entityType, IndexCoordinates ind
* @since 4.1
*/
Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index);

/**
* Update document(s) by query.
* @param updateQuery query defining the update
* @param index the index where to update the records
* @return a {@link Mono} emitting the update response
* @since 4.2
*/
Mono<UpdateByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.data.elasticsearch.core;

import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
Expand Down Expand Up @@ -97,6 +99,7 @@
* @author Roman Puchkovskiy
* @author Russell Parry
* @author Thomas Geese
* @author Farid Faoudi
* @since 3.2
*/
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
Expand Down Expand Up @@ -558,6 +561,18 @@ public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates ind
});
}

@Override
public Mono<UpdateByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {

Assert.notNull(updateQuery, "UpdateQuery must not be null");
Assert.notNull(index, "Index must not be null");

return Mono.defer(() -> {
final UpdateByQueryRequest request = requestFactory.updateByQueryRequest(updateQuery, index);
return Mono.from(execute(client -> client.updateBy(request)));
});
}

@Override
public Mono<Long> delete(Query query, Class<?> entityType) {
return delete(query, entityType, getIndexCoordinatesFor(entityType));
Expand Down
Loading