Skip to content

Support Delete by query with es parameters. #2875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.MoreLikeThisQuery;
import org.springframework.data.elasticsearch.core.query.Query;
Expand Down Expand Up @@ -177,6 +178,11 @@ public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, Index
doBulkOperation(queries, bulkOptions, index);
}

@Override
public ByQueryResponse delete(DeleteQuery query, Class<?> clazz) {
return delete(query, clazz, getIndexCoordinatesFor(clazz));
}

@Override
public ByQueryResponse delete(Query query, Class<?> clazz, IndexCoordinates index) {

Expand All @@ -190,6 +196,18 @@ public ByQueryResponse delete(Query query, Class<?> clazz, IndexCoordinates inde
return responseConverter.byQueryResponse(response);
}

@Override
public ByQueryResponse delete(DeleteQuery query, Class<?> clazz, IndexCoordinates index) {
Assert.notNull(query, "query must not be null");

DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(),
clazz, index, getRefreshPolicy());

DeleteByQueryResponse response = execute(client -> client.deleteByQuery(request));

return responseConverter.byQueryResponse(response);
}

@Override
public UpdateResponse update(UpdateQuery updateQuery, IndexCoordinates index) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.Version;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
Expand Down Expand Up @@ -180,6 +181,15 @@ public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoord
return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse);
}

@Override
public Mono<ByQueryResponse> delete(DeleteQuery query, Class<?> entityType, IndexCoordinates index) {
Assert.notNull(query, "query must not be null");

DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(),
entityType, index, getRefreshPolicy());
return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse);
}

@Override
public <T> Mono<T> get(String id, Class<T> entityType, IndexCoordinates index) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,79 @@ public DeleteByQueryRequest documentDeleteByQueryRequest(Query query, @Nullable
});
}

public DeleteByQueryRequest documentDeleteByQueryRequest(DeleteQuery query, @Nullable String routing, Class<?> clazz,
IndexCoordinates index, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");

return DeleteByQueryRequest.of(dqb -> {
dqb.index(Arrays.asList(index.getIndexNames())) //
.query(getQuery(query.getQuery(), clazz))//
.refresh(deleteByQueryRefresh(refreshPolicy))
.requestsPerSecond(query.getRequestsPerSecond())
.maxDocs(query.getMaxDocs())
.scroll(time(query.getScroll()))
.scrollSize(query.getScrollSize());

if (query.getRouting() != null) {
dqb.routing(query.getRouting());
} else if (StringUtils.hasText(routing)) {
dqb.routing(routing);
}

if (query.getQ() != null) {
dqb.q(query.getQ())
.analyzer(query.getAnalyzer())
.analyzeWildcard(query.getAnalyzeWildcard())
.defaultOperator(operator(query.getDefaultOperator()))
.df(query.getDf())
.lenient(query.getLenient());
}

if (query.getExpandWildcards() != null && !query.getExpandWildcards().isEmpty()) {
dqb.expandWildcards(expandWildcards(query.getExpandWildcards()));
}
if (query.getStats() != null && !query.getStats().isEmpty()) {
dqb.stats(query.getStats());
}
if (query.getSlices() != null) {
dqb.slices(sb -> sb.value(query.getSlices()));
}
if (query.getSort() != null) {
ElasticsearchPersistentEntity<?> persistentEntity = getPersistentEntity(clazz);
List<SortOptions> sortOptions = getSortOptions(query.getSort(), persistentEntity);

if (!sortOptions.isEmpty()) {
dqb.sort(
sortOptions.stream()
.map(sortOption -> {
String order = "asc";
var sortField = sortOption.field();
if (sortField.order() != null) {
order = sortField.order().jsonValue();
}

return sortField.field() + ":" + order;
})
.collect(Collectors.toList())
);
}
}
dqb.allowNoIndices(query.getAllowNoIndices())
.conflicts(conflicts(query.getConflicts()))
.ignoreUnavailable(query.getIgnoreUnavailable())
.preference(query.getPreference())
.requestCache(query.getRequestCache())
.searchType(searchType(query.getSearchType()))
.searchTimeout(time(query.getSearchTimeout()))
.terminateAfter(query.getTerminateAfter())
.timeout(time(query.getTimeout()))
.version(query.getVersion());

return dqb;
});
}

public UpdateRequest<Document, ?> documentUpdateRequest(UpdateQuery query, IndexCoordinates index,
@Nullable RefreshPolicy refreshPolicy, @Nullable String routing) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import co.elastic.clients.elasticsearch._types.*;
import co.elastic.clients.elasticsearch._types.mapping.FieldType;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch._types.query_dsl.Operator;
import co.elastic.clients.elasticsearch.core.search.BoundaryScanner;
import co.elastic.clients.elasticsearch.core.search.HighlighterEncoder;
import co.elastic.clients.elasticsearch.core.search.HighlighterFragmenter;
Expand Down Expand Up @@ -46,6 +47,8 @@
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.query.types.ConflictsType;
import org.springframework.data.elasticsearch.core.query.types.OperatorType;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -500,4 +503,26 @@ static Map<String, JsonData> paramsMap(Map<String, Object> params) {
});
return mappedParams;
}

/**
* Convert a spring-data-elasticsearch operator to an Elasticsearch operator.
*
* @param operator spring-data-elasticsearch operator.
* @return an Elasticsearch Operator.
*/
@Nullable
static Operator operator(@Nullable OperatorType operator) {
return operator != null ? Operator.valueOf(operator.name()) : null;
}

/**
* Convert a spring-data-elasticsearch {@literal conflicts} to an Elasticsearch {@literal conflicts}.
*
* @param conflicts spring-data-elasticsearch {@literal conflicts}.
* @return an Elasticsearch {@literal conflicts}.
*/
@Nullable
static Conflicts conflicts(@Nullable ConflictsType conflicts) {
return conflicts != null ? Conflicts.valueOf(conflicts.name()) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.elasticsearch.core;

import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
Expand Down Expand Up @@ -411,6 +412,11 @@ public Mono<String> delete(String id, IndexCoordinates index) {
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType) {
return delete(query, entityType, getIndexCoordinatesFor(entityType));
}

@Override
public Mono<ByQueryResponse> delete(DeleteQuery query, Class<?> entityType) {
return delete(query, entityType, getIndexCoordinatesFor(entityType));
}
// endregion

// region SearchDocument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
Expand Down Expand Up @@ -279,9 +280,21 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
* {@link org.springframework.data.elasticsearch.annotations.Document}
* @return response with detailed information
* @since 4.1
* @deprecated since 5.3.0, use {@link #delete(DeleteQuery, Class)}
*/
ByQueryResponse delete(Query query, Class<?> clazz);

/**
* Delete all records matching the query.
*
* @param query query defining the objects
* @param clazz The entity class must be annotated with
* {@link org.springframework.data.elasticsearch.annotations.Document}
* @return response with detailed information
* @since 5.3
*/
ByQueryResponse delete(DeleteQuery query, Class<?> clazz);

/**
* Delete all records matching the query.
*
Expand All @@ -290,9 +303,22 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
* {@link org.springframework.data.elasticsearch.annotations.Document}
* @param index the index from which to delete
* @return response with detailed information
* @deprecated since 5.3.0, use {@link #delete(DeleteQuery, Class, IndexCoordinates)}
*/
ByQueryResponse delete(Query query, Class<?> clazz, IndexCoordinates index);

/**
* Delete all records matching the query.
*
* @param query query defining the objects
* @param clazz The entity class must be annotated with
* {@link org.springframework.data.elasticsearch.annotations.Document}
* @param index the index from which to delete
* @return response with detailed information
* @since 5.3
*/
ByQueryResponse delete(DeleteQuery query, Class<?> clazz, IndexCoordinates index);

/**
* Partially update a document by the given entity.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.elasticsearch.core;

import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -331,19 +332,42 @@ default Mono<Void> bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index)
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @return a {@link Mono} emitting the number of the removed documents.
* @deprecated since 5.3.0, use {@link #delete(DeleteQuery, Class)}
*/
Mono<ByQueryResponse> delete(Query query, Class<?> entityType);

/**
* Delete the documents matching the given {@link Query} extracting index from entity metadata.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @return a {@link Mono} emitting the number of the removed documents.
* @since 5.3
*/
Mono<ByQueryResponse> delete(DeleteQuery query, Class<?> entityType);

/**
* Delete the documents matching the given {@link Query} extracting index from entity metadata.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the target index, must not be {@literal null}
* @return a {@link Mono} emitting the number of the removed documents.
* @deprecated since 5.3.0, use {@link #delete(DeleteQuery, Class, IndexCoordinates)}
*/
Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index);

/**
* Delete the documents matching the given {@link Query} extracting index from entity metadata.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the target index, must not be {@literal null}
* @return a {@link Mono} emitting the number of the removed documents.
* @since 5.3
*/
Mono<ByQueryResponse> delete(DeleteQuery query, Class<?> entityType, IndexCoordinates index);

/**
* Partial update of the document.
*
Expand Down
Loading