diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java index 1d60ab94a..099c2ba7a 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java @@ -84,574 +84,575 @@ */ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearchTemplate { - private static final Log LOGGER = LogFactory.getLog(ReactiveElasticsearchTemplate.class); - - private final ReactiveElasticsearchClient client; - private final RequestConverter requestConverter; - private final ResponseConverter responseConverter; - private final JsonpMapper jsonpMapper; - private final ElasticsearchExceptionTranslator exceptionTranslator; - - public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client, ElasticsearchConverter converter) { - super(converter); - - Assert.notNull(client, "client must not be null"); - - this.client = client; - this.jsonpMapper = client._transport().jsonpMapper(); - requestConverter = new RequestConverter(converter, jsonpMapper); - responseConverter = new ResponseConverter(jsonpMapper); - exceptionTranslator = new ElasticsearchExceptionTranslator(jsonpMapper); - } - - // region Document operations - @Override - protected Mono> doIndex(T entity, IndexCoordinates index) { - - IndexRequest indexRequest = requestConverter.documentIndexRequest(getIndexQuery(entity), index, - getRefreshPolicy()); - return Mono.just(entity) // - .zipWith(// - Mono.from(execute(client -> client.index(indexRequest))) // - .map(indexResponse -> new IndexResponseMetaData(indexResponse.id(), // - indexResponse.index(), // - indexResponse.seqNo(), // - indexResponse.primaryTerm(), // - indexResponse.version() // - ))); - } - - @Override - public Flux saveAll(Mono> entitiesPublisher, IndexCoordinates index) { - - Assert.notNull(entitiesPublisher, "entitiesPublisher must not be null!"); - - return entitiesPublisher // - .flatMapMany(entities -> Flux.fromIterable(entities) // - .concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) // - ).collectList() // - .map(Entities::new) // - .flatMapMany(entities -> { - - if (entities.isEmpty()) { - return Flux.empty(); - } - - return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index)// - .index() // - .flatMap(indexAndResponse -> { - T savedEntity = entities.entityAt(indexAndResponse.getT1()); - BulkResponseItem response = indexAndResponse.getT2(); - var updatedEntity = entityOperations.updateIndexedObject( - savedEntity, new IndexedObjectInformation( // - response.id(), // - response.index(), // - response.seqNo(), // - response.primaryTerm(), // - response.version()), - converter, - routingResolver); - return maybeCallbackAfterSave(updatedEntity, index); - }); - }); - } - - @Override - protected Mono doExists(String id, IndexCoordinates index) { - - Assert.notNull(id, "id must not be null"); - Assert.notNull(index, "index must not be null"); - - ExistsRequest existsRequest = requestConverter.documentExistsRequest(id, routingResolver.getRouting(), index); - - return Mono.from(execute( - ((ClientCallback>) client -> client.exists(existsRequest)))) - .map(BooleanResponse::value) // - .onErrorReturn(NoSuchIndexException.class, false); - } - - @Override - public Mono delete(Query query, Class entityType, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - - DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(), - entityType, index, getRefreshPolicy()); - return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse); - } - - @Override - public Mono get(String id, Class entityType, IndexCoordinates index) { - - Assert.notNull(id, "id must not be null"); - Assert.notNull(entityType, "entityType must not be null"); - Assert.notNull(index, "index must not be null"); - - GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index); - - Mono> getResponse = Mono - .from(execute(client -> client.get(getRequest, EntityAsMap.class))); - - ReadDocumentCallback callback = new ReadDocumentCallback<>(converter, entityType, index); - return getResponse.flatMap(response -> callback.toEntity(DocumentAdapters.from(response))); - } - - @Override - public Mono reindex(ReindexRequest reindexRequest) { - - Assert.notNull(reindexRequest, "reindexRequest must not be null"); - - co.elastic.clients.elasticsearch.core.ReindexRequest reindexRequestES = requestConverter.reindex(reindexRequest, - true); - - return Mono.from(execute( // - client -> client.reindex(reindexRequestES))).map(responseConverter::reindexResponse); - } - - @Override - public Mono submitReindex(ReindexRequest reindexRequest) { - - Assert.notNull(reindexRequest, "reindexRequest must not be null"); - - co.elastic.clients.elasticsearch.core.ReindexRequest reindexRequestES = requestConverter.reindex(reindexRequest, - false); - - return Mono.from(execute( // - client -> client.reindex(reindexRequestES))) - .flatMap(response -> (response.task() == null) - ? Mono.error( - new UnsupportedBackendOperation("ElasticsearchClient did not return a task id on submit request")) - : Mono.just(response.task())); - } + private static final Log LOGGER = LogFactory.getLog(ReactiveElasticsearchTemplate.class); + + private final ReactiveElasticsearchClient client; + private final RequestConverter requestConverter; + private final ResponseConverter responseConverter; + private final JsonpMapper jsonpMapper; + private final ElasticsearchExceptionTranslator exceptionTranslator; + + public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client, ElasticsearchConverter converter) { + super(converter); + + Assert.notNull(client, "client must not be null"); + + this.client = client; + this.jsonpMapper = client._transport().jsonpMapper(); + requestConverter = new RequestConverter(converter, jsonpMapper); + responseConverter = new ResponseConverter(jsonpMapper); + exceptionTranslator = new ElasticsearchExceptionTranslator(jsonpMapper); + } + + // region Document operations + @Override + protected Mono> doIndex(T entity, IndexCoordinates index) { + + IndexRequest indexRequest = requestConverter.documentIndexRequest(getIndexQuery(entity), index, + getRefreshPolicy()); + return Mono.just(entity) // + .zipWith(// + Mono.from(execute(client -> client.index(indexRequest))) // + .map(indexResponse -> new IndexResponseMetaData(indexResponse.id(), // + indexResponse.index(), // + indexResponse.seqNo(), // + indexResponse.primaryTerm(), // + indexResponse.version() // + ))); + } + + @Override + public Flux saveAll(Mono> entitiesPublisher, IndexCoordinates index) { + + Assert.notNull(entitiesPublisher, "entitiesPublisher must not be null!"); + + return entitiesPublisher // + .flatMapMany(entities -> Flux.fromIterable(entities) // + .concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) // + ).collectList() // + .map(Entities::new) // + .flatMapMany(entities -> { + + if (entities.isEmpty()) { + return Flux.empty(); + } + + return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index)// + .index() // + .flatMap(indexAndResponse -> { + T savedEntity = entities.entityAt(indexAndResponse.getT1()); + BulkResponseItem response = indexAndResponse.getT2(); + var updatedEntity = entityOperations.updateIndexedObject( + savedEntity, new IndexedObjectInformation( // + response.id(), // + response.index(), // + response.seqNo(), // + response.primaryTerm(), // + response.version()), + converter, + routingResolver); + return maybeCallbackAfterSave(updatedEntity, index); + }); + }); + } + + @Override + protected Mono doExists(String id, IndexCoordinates index) { + + Assert.notNull(id, "id must not be null"); + Assert.notNull(index, "index must not be null"); + + ExistsRequest existsRequest = requestConverter.documentExistsRequest(id, routingResolver.getRouting(), index); + + return Mono.from(execute( + ((ClientCallback>) client -> client.exists(existsRequest)))) + .map(BooleanResponse::value) // + .onErrorReturn(NoSuchIndexException.class, false); + } + + @Override + public Mono delete(Query query, Class entityType, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + + DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(), + entityType, index, getRefreshPolicy()); + return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse); + } + + @Override + public Mono get(String id, Class entityType, IndexCoordinates index) { + + Assert.notNull(id, "id must not be null"); + Assert.notNull(entityType, "entityType must not be null"); + Assert.notNull(index, "index must not be null"); + + GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index); + + Mono> getResponse = Mono + .from(execute(client -> client.get(getRequest, EntityAsMap.class))); + + ReadDocumentCallback callback = new ReadDocumentCallback<>(converter, entityType, index); + return getResponse.flatMap(response -> callback.toEntity(DocumentAdapters.from(response))); + } + + @Override + public Mono reindex(ReindexRequest reindexRequest) { + + Assert.notNull(reindexRequest, "reindexRequest must not be null"); + + co.elastic.clients.elasticsearch.core.ReindexRequest reindexRequestES = requestConverter.reindex(reindexRequest, + true); + + return Mono.from(execute( // + client -> client.reindex(reindexRequestES))).map(responseConverter::reindexResponse); + } + + @Override + public Mono submitReindex(ReindexRequest reindexRequest) { + + Assert.notNull(reindexRequest, "reindexRequest must not be null"); + + co.elastic.clients.elasticsearch.core.ReindexRequest reindexRequestES = requestConverter.reindex(reindexRequest, + false); + + return Mono.from(execute( // + client -> client.reindex(reindexRequestES))) + .flatMap(response -> (response.task() == null) + ? Mono.error( + new UnsupportedBackendOperation("ElasticsearchClient did not return a task id on submit request")) + : Mono.just(response.task())); + } - @Override - public Mono update(UpdateQuery updateQuery, IndexCoordinates index) { - - Assert.notNull(updateQuery, "UpdateQuery must not be null"); - Assert.notNull(index, "Index must not be null"); - - UpdateRequest request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(), - routingResolver.getRouting()); - - return Mono.from(execute(client -> client.update(request, Document.class))).flatMap(response -> { - UpdateResponse.Result result = result(response.result()); - return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result)); - }); - } - - @Override - public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { - throw new UnsupportedOperationException("not implemented"); - } + @Override + public Mono update(UpdateQuery updateQuery, IndexCoordinates index) { + + Assert.notNull(updateQuery, "UpdateQuery must not be null"); + Assert.notNull(index, "Index must not be null"); + + UpdateRequest request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(), + routingResolver.getRouting()); + + return Mono.from(execute(client -> client.update(request, Document.class))).flatMap(response -> { + UpdateResponse.Result result = result(response.result()); + return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result)); + }); + } + + @Override + public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { + throw new UnsupportedOperationException("not implemented"); + } - @Override - public Mono bulkUpdate(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + @Override + public Mono bulkUpdate(List queries, BulkOptions bulkOptions, IndexCoordinates index) { - Assert.notNull(queries, "List of UpdateQuery must not be null"); - Assert.notNull(bulkOptions, "BulkOptions must not be null"); - Assert.notNull(index, "Index must not be null"); + Assert.notNull(queries, "List of UpdateQuery must not be null"); + Assert.notNull(bulkOptions, "BulkOptions must not be null"); + Assert.notNull(index, "Index must not be null"); - return doBulkOperation(queries, bulkOptions, index).then(); - } + return doBulkOperation(queries, bulkOptions, index).then(); + } - private Flux doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + private Flux doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { - BulkRequest bulkRequest = requestConverter.documentBulkRequest(queries, bulkOptions, index, getRefreshPolicy()); - return client.bulk(bulkRequest) - .onErrorMap(e -> new UncategorizedElasticsearchException("Error executing bulk request", e)) - .flatMap(this::checkForBulkOperationFailure) // - .flatMapMany(response -> Flux.fromIterable(response.items())); + BulkRequest bulkRequest = requestConverter.documentBulkRequest(queries, bulkOptions, index, getRefreshPolicy()); + return client.bulk(bulkRequest) + .onErrorMap(e -> new UncategorizedElasticsearchException("Error executing bulk request", e)) + .flatMap(this::checkForBulkOperationFailure) // + .flatMapMany(response -> Flux.fromIterable(response.items())); - } + } - private Mono checkForBulkOperationFailure(BulkResponse bulkResponse) { + private Mono checkForBulkOperationFailure(BulkResponse bulkResponse) { - if (bulkResponse.errors()) { - Map failedDocuments = new HashMap<>(); + if (bulkResponse.errors()) { + Map failedDocuments = new HashMap<>(); - for (BulkResponseItem item : bulkResponse.items()) { + for (BulkResponseItem item : bulkResponse.items()) { - if (item.error() != null) { - failedDocuments.put(item.id(), new BulkFailureException.FailureDetails(item.status(), item.error().reason())); - } - } - BulkFailureException exception = new BulkFailureException( - "Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" - + failedDocuments + ']', - failedDocuments); - return Mono.error(exception); - } else { - return Mono.just(bulkResponse); - } - } + if (item.error() != null) { + failedDocuments.put(item.id(), new BulkFailureException.FailureDetails(item.status(), item.error().reason())); + } + } + BulkFailureException exception = new BulkFailureException( + "Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" + + failedDocuments + ']', + failedDocuments); + return Mono.error(exception); + } else { + return Mono.just(bulkResponse); + } + } - @Override - protected Mono doDeleteById(String id, @Nullable String routing, IndexCoordinates index) { - - Assert.notNull(id, "id must not be null"); - Assert.notNull(index, "index must not be null"); - - return Mono.defer(() -> { - DeleteRequest deleteRequest = requestConverter.documentDeleteRequest(id, routing, index, getRefreshPolicy()); - return doDelete(deleteRequest); - }); - } - - private Mono doDelete(DeleteRequest request) { - - return Mono.from(execute(client -> client.delete(request))) // - .flatMap(deleteResponse -> { - if (deleteResponse.result() == Result.NotFound) { - return Mono.empty(); - } - return Mono.just(deleteResponse.id()); - }).onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); - } - - @Override - public Flux> multiGet(Query query, Class clazz, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - Assert.notNull(clazz, "clazz must not be null"); - - MgetRequest request = requestConverter.documentMgetRequest(query, clazz, index); - - ReadDocumentCallback callback = new ReadDocumentCallback<>(converter, clazz, index); - - Publisher> response = execute(client -> client.mget(request, EntityAsMap.class)); - - return Mono.from(response)// - .flatMapMany(it -> Flux.fromIterable(DocumentAdapters.from(it))) // - .flatMap(multiGetItem -> { - if (multiGetItem.isFailed()) { - return Mono.just(MultiGetItem.of(null, multiGetItem.getFailure())); - } else { - return callback.toEntity(multiGetItem.getItem()) // - .map(t -> MultiGetItem.of(t, multiGetItem.getFailure())); - } - }); - } - - // endregion - - @Override - protected ReactiveElasticsearchTemplate doCopy() { - return new ReactiveElasticsearchTemplate(client, converter); - } - - // region search operations - - @Override - protected Flux doFind(Query query, Class clazz, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - Assert.notNull(clazz, "clazz must not be null"); - Assert.notNull(index, "index must not be null"); - - if (query instanceof SearchTemplateQuery searchTemplateQuery) { - return Flux.defer(() -> doSearch(searchTemplateQuery, clazz, index)); - } else { - return Flux.defer(() -> { - boolean queryIsUnbounded = !(query.getPageable().isPaged() || query.isLimiting()); - return queryIsUnbounded ? doFindUnbounded(query, clazz, index) : doFindBounded(query, clazz, index); - }); - } - } - - private Flux doFindUnbounded(Query query, Class clazz, IndexCoordinates index) { - - if (query instanceof BaseQuery baseQuery) { - var pitKeepAlive = Duration.ofMinutes(5); - // setup functions for Flux.usingWhen() - Mono resourceSupplier = openPointInTime(index, pitKeepAlive, true) - .map(pit -> new PitSearchAfter(baseQuery, pit)); - - Function> asyncComplete = this::cleanupPit; - - BiFunction> asyncError = (psa, ex) -> { - if (LOGGER.isErrorEnabled()) { - LOGGER.error("Error during pit/search_after", ex); - } - return cleanupPit(psa); - }; - - Function> asyncCancel = psa -> { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("pit/search_after was cancelled"); - } - return cleanupPit(psa); - }; - - Function>> resourceClosure = psa -> { - - baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive)); - baseQuery.addSort(Sort.by("_shard_doc")); - SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, routingResolver.getRouting(), - clazz, index, false, true); - - return Mono.from(execute(client -> client.search(firstSearchRequest, EntityAsMap.class))) - .expand(entityAsMapSearchResponse -> { - - var hits = entityAsMapSearchResponse.hits().hits(); - if (CollectionUtils.isEmpty(hits)) { - return Mono.empty(); - } - - List sortOptions = hits.get(hits.size() - 1).sort().stream().map(TypeUtils::toObject) - .collect(Collectors.toList()); - baseQuery.setSearchAfter(sortOptions); - SearchRequest followSearchRequest = requestConverter.searchRequest(baseQuery, - routingResolver.getRouting(), clazz, index, false, true); - return Mono.from(execute(client -> client.search(followSearchRequest, EntityAsMap.class))); - }); - - }; - - Flux> searchResponses = Flux.usingWhen(resourceSupplier, resourceClosure, asyncComplete, - asyncError, asyncCancel); - return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) - .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); - } else { - return Flux.error(new IllegalArgumentException("Query must be derived from BaseQuery")); - } - } - - private Publisher cleanupPit(PitSearchAfter psa) { - var baseQuery = psa.getBaseQuery(); - baseQuery.setPointInTime(null); - baseQuery.setSearchAfter(null); - baseQuery.setSort(psa.getSort()); - var pit = psa.getPit(); - return StringUtils.hasText(pit) ? closePointInTime(pit) : Mono.empty(); - } - - static private class PitSearchAfter { - private final BaseQuery baseQuery; - @Nullable private final Sort sort; - private final String pit; - - PitSearchAfter(BaseQuery baseQuery, String pit) { - this.baseQuery = baseQuery; - this.sort = baseQuery.getSort(); - this.pit = pit; - } - - public BaseQuery getBaseQuery() { - return baseQuery; - } - - @Nullable - public Sort getSort() { - return sort; - } - - public String getPit() { - return pit; - } - } - - @Override - protected Mono doCount(Query query, Class entityType, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - Assert.notNull(index, "index must not be null"); + @Override + protected Mono doDeleteById(String id, @Nullable String routing, IndexCoordinates index) { + + Assert.notNull(id, "id must not be null"); + Assert.notNull(index, "index must not be null"); + + return Mono.defer(() -> { + DeleteRequest deleteRequest = requestConverter.documentDeleteRequest(id, routing, index, getRefreshPolicy()); + return doDelete(deleteRequest); + }); + } + + private Mono doDelete(DeleteRequest request) { + + return Mono.from(execute(client -> client.delete(request))) // + .flatMap(deleteResponse -> { + if (deleteResponse.result() == Result.NotFound) { + return Mono.empty(); + } + return Mono.just(deleteResponse.id()); + }).onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); + } + + @Override + public Flux> multiGet(Query query, Class clazz, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(clazz, "clazz must not be null"); + + MgetRequest request = requestConverter.documentMgetRequest(query, clazz, index); + + ReadDocumentCallback callback = new ReadDocumentCallback<>(converter, clazz, index); + + Publisher> response = execute(client -> client.mget(request, EntityAsMap.class)); + + return Mono.from(response)// + .flatMapMany(it -> Flux.fromIterable(DocumentAdapters.from(it))) // + .flatMap(multiGetItem -> { + if (multiGetItem.isFailed()) { + return Mono.just(MultiGetItem.of(null, multiGetItem.getFailure())); + } else { + return callback.toEntity(multiGetItem.getItem()) // + .map(t -> MultiGetItem.of(t, multiGetItem.getFailure())); + } + }); + } + + // endregion + + @Override + protected ReactiveElasticsearchTemplate doCopy() { + return new ReactiveElasticsearchTemplate(client, converter); + } + + // region search operations + + @Override + protected Flux doFind(Query query, Class clazz, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(clazz, "clazz must not be null"); + Assert.notNull(index, "index must not be null"); + + if (query instanceof SearchTemplateQuery searchTemplateQuery) { + return Flux.defer(() -> doSearch(searchTemplateQuery, clazz, index)); + } else { + return Flux.defer(() -> { + boolean queryIsUnbounded = !(query.getPageable().isPaged() || query.isLimiting()); + return queryIsUnbounded ? doFindUnbounded(query, clazz, index) : doFindBounded(query, clazz, index); + }); + } + } + + private Flux doFindUnbounded(Query query, Class clazz, IndexCoordinates index) { + + if (query instanceof BaseQuery baseQuery) { + var pitKeepAlive = Duration.ofMinutes(5); + // setup functions for Flux.usingWhen() + Mono resourceSupplier = openPointInTime(index, pitKeepAlive, true) + .map(pit -> new PitSearchAfter(baseQuery, pit)); + + Function> asyncComplete = this::cleanupPit; + + BiFunction> asyncError = (psa, ex) -> { + if (LOGGER.isErrorEnabled()) { + LOGGER.error("Error during pit/search_after", ex); + } + return cleanupPit(psa); + }; + + Function> asyncCancel = psa -> { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("pit/search_after was cancelled"); + } + return cleanupPit(psa); + }; + + Function>> resourceClosure = psa -> { + + baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive)); + baseQuery.addSort(Sort.by("_shard_doc")); + SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, routingResolver.getRouting(), + clazz, index, false, true); + + return Mono.from(execute(client -> client.search(firstSearchRequest, EntityAsMap.class))) + .expand(entityAsMapSearchResponse -> { + + var hits = entityAsMapSearchResponse.hits().hits(); + if (CollectionUtils.isEmpty(hits)) { + return Mono.empty(); + } + + List sortOptions = hits.get(hits.size() - 1).sort().stream().map(TypeUtils::toObject) + .collect(Collectors.toList()); + baseQuery.setSearchAfter(sortOptions); + SearchRequest followSearchRequest = requestConverter.searchRequest(baseQuery, + routingResolver.getRouting(), clazz, index, false, true); + return Mono.from(execute(client -> client.search(followSearchRequest, EntityAsMap.class))); + }); + + }; + + Flux> searchResponses = Flux.usingWhen(resourceSupplier, resourceClosure, asyncComplete, + asyncError, asyncCancel); + return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) + .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); + } else { + return Flux.error(new IllegalArgumentException("Query must be derived from BaseQuery")); + } + } + + private Publisher cleanupPit(PitSearchAfter psa) { + var baseQuery = psa.getBaseQuery(); + baseQuery.setPointInTime(null); + baseQuery.setSearchAfter(null); + baseQuery.setSort(psa.getSort()); + var pit = psa.getPit(); + return StringUtils.hasText(pit) ? closePointInTime(pit) : Mono.empty(); + } + + static private class PitSearchAfter { + private final BaseQuery baseQuery; + @Nullable + private final Sort sort; + private final String pit; + + PitSearchAfter(BaseQuery baseQuery, String pit) { + this.baseQuery = baseQuery; + this.sort = baseQuery.getSort(); + this.pit = pit; + } + + public BaseQuery getBaseQuery() { + return baseQuery; + } + + @Nullable + public Sort getSort() { + return sort; + } + + public String getPit() { + return pit; + } + } + + @Override + protected Mono doCount(Query query, Class entityType, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(index, "index must not be null"); - SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), entityType, index, - true); + SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), entityType, index, + true); - return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) - .map(searchResponse -> searchResponse.hits().total() != null ? searchResponse.hits().total().value() : 0L); - } - - private Flux doFindBounded(Query query, Class clazz, IndexCoordinates index) { + return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) + .map(searchResponse -> searchResponse.hits().total() != null ? searchResponse.hits().total().value() : 0L); + } + + private Flux doFindBounded(Query query, Class clazz, IndexCoordinates index) { - SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index, - false, false); - - return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) // - .flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) // - .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); - } - - private Flux doSearch(SearchTemplateQuery query, Class clazz, IndexCoordinates index) { - - var request = requestConverter.searchTemplate(query, routingResolver.getRouting(), index); - - return Mono.from(execute(client -> client.searchTemplate(request, EntityAsMap.class))) // - .flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) // - .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); - } - - @Override - protected Mono doFindForResponse(Query query, Class clazz, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - Assert.notNull(index, "index must not be null"); - - SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index, - false); - - // noinspection unchecked - SearchDocumentCallback callback = new ReadSearchDocumentCallback<>((Class) clazz, index); - SearchDocumentResponse.EntityCreator entityCreator = searchDocument -> callback.toEntity(searchDocument) - .toFuture(); - - return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) - .map(searchResponse -> SearchDocumentResponseBuilder.from(searchResponse, entityCreator, jsonpMapper)); - } - - @Override - public Flux> aggregate(Query query, Class entityType, IndexCoordinates index) { - - return doFindForResponse(query, entityType, index).flatMapMany(searchDocumentResponse -> { - ElasticsearchAggregations aggregations = (ElasticsearchAggregations) searchDocumentResponse.getAggregations(); - return aggregations == null ? Flux.empty() : Flux.fromIterable(aggregations.aggregations()); - }); - } - - @Override - public Mono openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) { - - Assert.notNull(index, "index must not be null"); - Assert.notNull(keepAlive, "keepAlive must not be null"); - Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null"); - - var request = requestConverter.searchOpenPointInTimeRequest(index, keepAlive, ignoreUnavailable); - return Mono.from(execute(client -> client.openPointInTime(request))).map(OpenPointInTimeResponse::id); - } - - @Override - public Mono closePointInTime(String pit) { - - Assert.notNull(pit, "pit must not be null"); - - ClosePointInTimeRequest request = requestConverter.searchClosePointInTime(pit); - return Mono.from(execute(client -> client.closePointInTime(request))).map(ClosePointInTimeResponse::succeeded); - } - - // endregion - - // region script operations - @Override - public Mono putScript(Script script) { - - Assert.notNull(script, "script must not be null"); - - var request = requestConverter.scriptPut(script); - return Mono.from(execute(client -> client.putScript(request))).map(PutScriptResponse::acknowledged); - } - - @Override - public Mono