diff --git a/src/main/java/org/springframework/data/couchbase/core/CouchbaseTemplate.java b/src/main/java/org/springframework/data/couchbase/core/CouchbaseTemplate.java index 4bca6e330..c0965a6b9 100644 --- a/src/main/java/org/springframework/data/couchbase/core/CouchbaseTemplate.java +++ b/src/main/java/org/springframework/data/couchbase/core/CouchbaseTemplate.java @@ -158,6 +158,11 @@ public ExecutableRemoveByQuery removeByQuery(Class domainType) { return new ExecutableRemoveByQueryOperationSupport(this).removeByQuery(domainType); } + @Override + public ExecutableRangeScan rangeScan(Class domainType) { + return new ExecutableRangeScanOperationSupport(this).rangeScan(domainType); + } + @Override public String getBucketName() { return clientFactory.getBucket().name(); diff --git a/src/main/java/org/springframework/data/couchbase/core/ExecutableRangeScanOperation.java b/src/main/java/org/springframework/data/couchbase/core/ExecutableRangeScanOperation.java new file mode 100644 index 000000000..ed101df6e --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/core/ExecutableRangeScanOperation.java @@ -0,0 +1,213 @@ +/* + * Copyright 2012-2023 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.couchbase.core; + +import java.util.stream.Stream; + +import org.springframework.data.couchbase.core.support.ConsistentWith; +import org.springframework.data.couchbase.core.support.InCollection; +import org.springframework.data.couchbase.core.support.InScope; +import org.springframework.data.couchbase.core.support.WithBatchByteLimit; +import org.springframework.data.couchbase.core.support.WithBatchItemLimit; +import org.springframework.data.couchbase.core.support.WithScanOptions; +import org.springframework.data.couchbase.core.support.WithScanSort; + +import com.couchbase.client.java.kv.MutationState; +import com.couchbase.client.java.kv.ScanOptions; + +/** + * Get Operations + * + * @author Michael Reiche + */ +public interface ExecutableRangeScanOperation { + + /** + * Loads a document from a bucket. + * + * @param domainType the entity type to use for the results. + */ + ExecutableRangeScan rangeScan(Class domainType); + + /** + * Terminating operations invoking the actual execution. + * + * @param the entity type to use for the results. + */ + interface TerminatingRangeScan /*extends OneAndAllId*/ { + + /** + * Range Scan + * + * @param upper + * @param lower + * @return the list of found entities. + */ + Stream rangeScan(String lower, String upper); + + /** + * Range Scan Ids + * + * @param upper + * @param lower + * @return the list of found keys. + */ + Stream rangeScanIds(String lower, String upper); + + /** + * Range Scan + * + * @param limit + * @param seed + * @return the list of found entities. + */ + Stream samplingScan(Long limit, Long... seed); + + /** + * Range Scan Ids + * + * @param limit + * @param seed + * @return the list of keys + */ + Stream samplingScanIds(Long limit, Long... seed); + } + + /** + * Fluent method to specify options. + * + * @param the entity type to use for the results. + */ + interface RangeScanWithOptions extends TerminatingRangeScan, WithScanOptions { + /** + * Fluent method to specify options to use for execution + * + * @param options options to use for execution + */ + @Override + TerminatingRangeScan withOptions(ScanOptions options); + } + + /** + * Fluent method to specify the collection. + * + * @param the entity type to use for the results. + */ + interface RangeScanInCollection extends RangeScanWithOptions, InCollection { + /** + * With a different collection + * + * @param collection the collection to use. + */ + @Override + RangeScanWithOptions inCollection(String collection); + } + + /** + * Fluent method to specify the scope. + * + * @param the entity type to use for the results. + */ + interface RangeScanInScope extends RangeScanInCollection, InScope { + /** + * With a different scope + * + * @param scope the scope to use. + */ + @Override + RangeScanInCollection inScope(String scope); + } + + + + interface RangeScanWithSort extends RangeScanInScope, WithScanSort { + /** + * sort + * + * @param sort + */ + @Override + RangeScanInScope withSort(Object sort); + } + + /** + * Fluent method to specify scan consistency. Scan consistency may also come from an annotation. + * + * @param the entity type to use for the results. + */ + interface RangeScanConsistentWith extends RangeScanWithSort, ConsistentWith { + + /** + * Allows to override the default scan consistency. + * + * @param mutationState the custom scan consistency to use for this query. + */ + @Override + RangeScanWithSort consistentWith(MutationState mutationState); + } + + /** + * Fluent method to specify a return type different than the the entity type to use for the results. + * + * @param the entity type to use for the results. + */ + interface RangeScanWithProjection extends RangeScanConsistentWith { + + /** + * Define the target type fields should be mapped to.
+ * Skip this step if you are only interested in the original the entity type to use for the results. + * + * @param returnType must not be {@literal null}. + * @return new instance of {@link ExecutableFindByQueryOperation.FindByQueryWithProjection}. + * @throws IllegalArgumentException if returnType is {@literal null}. + */ + RangeScanConsistentWith as(Class returnType); + } + + interface RangeScanWithBatchItemLimit extends RangeScanWithProjection, WithBatchItemLimit { + + /** + * determines if result are just ids or ids plus contents + * + * @param batchByteLimit must not be {@literal null}. + * @return new instance of {@link RangeScanWithProjection}. + * @throws IllegalArgumentException if returnType is {@literal null}. + */ + @Override + RangeScanWithProjection withBatchItemLimit(Integer batchByteLimit); + } + + interface RangeScanWithBatchByteLimit extends RangeScanWithBatchItemLimit, WithBatchByteLimit { + + /** + * determines if result are just ids or ids plus contents + * + * @param batchByteLimit must not be {@literal null}. + * @return new instance of {@link RangeScanWithProjection}. + * @throws IllegalArgumentException if returnType is {@literal null}. + */ + @Override + RangeScanWithBatchItemLimit withBatchByteLimit(Integer batchByteLimit); + } + + /** + * Provides methods for constructing query operations in a fluent way. + * + * @param the entity type to use for the results + */ + interface ExecutableRangeScan extends RangeScanWithBatchByteLimit {} + +} diff --git a/src/main/java/org/springframework/data/couchbase/core/ExecutableRangeScanOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ExecutableRangeScanOperationSupport.java new file mode 100644 index 000000000..2e795dec5 --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/core/ExecutableRangeScanOperationSupport.java @@ -0,0 +1,143 @@ +/* + * Copyright 2012-2023 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.couchbase.core; + +import java.util.stream.Stream; + +import org.springframework.data.couchbase.core.ReactiveRangeScanOperationSupport.ReactiveRangeScanSupport; +import org.springframework.data.couchbase.core.query.OptionsBuilder; +import org.springframework.util.Assert; + +import com.couchbase.client.java.kv.MutationState; +import com.couchbase.client.java.kv.ScanOptions; + +public class ExecutableRangeScanOperationSupport implements ExecutableRangeScanOperation { + + private final CouchbaseTemplate template; + + ExecutableRangeScanOperationSupport(CouchbaseTemplate template) { + this.template = template; + } + + @Override + public ExecutableRangeScan rangeScan(Class domainType) { + return new ExecutableRangeScanSupport<>(template, domainType, OptionsBuilder.getScopeFrom(domainType), + OptionsBuilder.getCollectionFrom(domainType), null, null, null, null, null); + } + + static class ExecutableRangeScanSupport implements ExecutableRangeScan { + + private final CouchbaseTemplate template; + private final Class domainType; + private final String scope; + private final String collection; + private final ScanOptions options; + private final Object sort; + private final MutationState mutationState; + private final Integer batchItemLimit; + private final Integer batchByteLimit; + private final ReactiveRangeScanSupport reactiveSupport; + + ExecutableRangeScanSupport(CouchbaseTemplate template, Class domainType, String scope, String collection, + ScanOptions options, Object sort, MutationState mutationState, + Integer batchItemLimit, Integer batchByteLimit) { + this.template = template; + this.domainType = domainType; + this.scope = scope; + this.collection = collection; + this.options = options; + this.sort = sort; + this.mutationState = mutationState; + this.batchItemLimit = batchItemLimit; + this.batchByteLimit = batchByteLimit; + this.reactiveSupport = new ReactiveRangeScanSupport<>(template.reactive(), domainType, scope, collection, options, + sort, mutationState, batchItemLimit, batchByteLimit, + new NonReactiveSupportWrapper(template.support())); + } + + @Override + public TerminatingRangeScan withOptions(final ScanOptions options) { + Assert.notNull(options, "Options must not be null."); + return new ExecutableRangeScanSupport<>(template, domainType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit); + } + + @Override + public RangeScanWithOptions inCollection(final String collection) { + return new ExecutableRangeScanSupport<>(template, domainType, scope, + collection != null ? collection : this.collection, options, sort, mutationState, + batchItemLimit, batchByteLimit); + } + + @Override + public RangeScanInCollection inScope(final String scope) { + return new ExecutableRangeScanSupport<>(template, domainType, scope != null ? scope : this.scope, collection, + options, sort, mutationState, batchItemLimit, batchByteLimit); + } + + @Override + public RangeScanInScope withSort(Object sort) { + return new ExecutableRangeScanSupport<>(template, domainType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit); + } + + @Override + public RangeScanWithSort consistentWith(MutationState mutationState) { + return new ExecutableRangeScanSupport<>(template, domainType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit); + } + + @Override + public RangeScanConsistentWith as(Class returnType) { + return new ExecutableRangeScanSupport<>(template, returnType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit); + } + + @Override + public RangeScanWithProjection withBatchItemLimit(Integer batchItemLimit) { + return new ExecutableRangeScanSupport<>(template, domainType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit); + } + + @Override + public RangeScanWithBatchItemLimit withBatchByteLimit(Integer batchByteLimit) { + return new ExecutableRangeScanSupport<>(template, domainType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit); + } + + @Override + public Stream rangeScan(String lower, String upper) { + return reactiveSupport.rangeScan(lower, upper, false, null, null).toStream(); + } + + @Override + public Stream rangeScanIds(String lower, String upper) { + return reactiveSupport.rangeScanIds(lower, upper, false, null, null).toStream(); + } + + @Override + public Stream samplingScan(Long limit, Long... seed) { + return reactiveSupport.sampleScan(limit, seed).toStream(); + } + + @Override + public Stream samplingScanIds(Long limit, Long... seed) { + return reactiveSupport.sampleScanIds(limit, seed).toStream(); + } + + } + +} diff --git a/src/main/java/org/springframework/data/couchbase/core/FluentCouchbaseOperations.java b/src/main/java/org/springframework/data/couchbase/core/FluentCouchbaseOperations.java index eee5dc0bd..55b9bd139 100644 --- a/src/main/java/org/springframework/data/couchbase/core/FluentCouchbaseOperations.java +++ b/src/main/java/org/springframework/data/couchbase/core/FluentCouchbaseOperations.java @@ -22,4 +22,5 @@ public interface FluentCouchbaseOperations extends ExecutableUpsertByIdOperation, ExecutableInsertByIdOperation, ExecutableReplaceByIdOperation, ExecutableFindByIdOperation, ExecutableFindFromReplicasByIdOperation, ExecutableFindByQueryOperation, ExecutableFindByAnalyticsOperation, ExecutableExistsByIdOperation, - ExecutableRemoveByIdOperation, ExecutableRemoveByQueryOperation, ExecutableMutateInByIdOperation {} + ExecutableRemoveByIdOperation, ExecutableRemoveByQueryOperation, ExecutableMutateInByIdOperation, + ExecutableRangeScanOperation {} diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java index 37a40120e..d11245af0 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java @@ -190,6 +190,11 @@ public ReactiveMutateInById mutateInById(Class domainType) { return new ReactiveMutateInByIdOperationSupport(this).mutateInById(domainType); } + @Override + public ReactiveRangeScan rangeScan(Class domainType) { + return new ReactiveRangeScanOperationSupport(this).rangeScan(domainType); + } + @Override public String getBucketName() { return clientFactory.getBucket().name(); diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveFluentCouchbaseOperations.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveFluentCouchbaseOperations.java index d7652642c..d2394dbc9 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveFluentCouchbaseOperations.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveFluentCouchbaseOperations.java @@ -22,4 +22,5 @@ public interface ReactiveFluentCouchbaseOperations extends ReactiveUpsertByIdOperation, ReactiveInsertByIdOperation, ReactiveReplaceByIdOperation, ReactiveFindByIdOperation, ReactiveExistsByIdOperation, ReactiveFindByAnalyticsOperation, ReactiveFindFromReplicasByIdOperation, ReactiveFindByQueryOperation, - ReactiveRemoveByIdOperation, ReactiveRemoveByQueryOperation, ReactiveMutateInByIdOperation {} + ReactiveRemoveByIdOperation, ReactiveRemoveByQueryOperation, ReactiveMutateInByIdOperation, + ReactiveRangeScanOperation {} diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveRangeScanOperation.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveRangeScanOperation.java new file mode 100644 index 000000000..e8ff902bc --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveRangeScanOperation.java @@ -0,0 +1,212 @@ +/* + * Copyright 2012-2023 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.couchbase.core; + +import reactor.core.publisher.Flux; + +import org.springframework.data.couchbase.core.support.ConsistentWith; +import org.springframework.data.couchbase.core.support.InCollection; +import org.springframework.data.couchbase.core.support.InScope; +import org.springframework.data.couchbase.core.support.WithBatchByteLimit; +import org.springframework.data.couchbase.core.support.WithBatchItemLimit; +import org.springframework.data.couchbase.core.support.WithScanOptions; +import org.springframework.data.couchbase.core.support.WithScanSort; + +import com.couchbase.client.java.kv.MutationState; +import com.couchbase.client.java.kv.ScanOptions; + +/** + * Get Operations + * + * @author Christoph Strobl + * @since 2.0 + */ +public interface ReactiveRangeScanOperation { + + /** + * Loads a document from a bucket. + * + * @param domainType the entity type to use for the results. + */ + ReactiveRangeScan rangeScan(Class domainType); + + /** + * Terminating operations invoking the actual execution. + * + * @param the entity type to use for the results. + */ + interface TerminatingRangeScan /*extends OneAndAllId*/ { + + /** + * Finds a list of documents based on the given IDs. + * + * @param lower the lower bound + * @param upper the upper bound + * @return the list of found entities. + */ + Flux rangeScan(String lower, String upper); + + /** + * Finds a list of documents based on the given IDs. + * + * @param lower the lower bound + * @param upper the upper bound + * @return the list of ids. + */ + Flux rangeScanIds(String lower, String upper); + + /** + * Finds a list of documents based on the given IDs. + * + * @param limit + * @param seed + * @return the list of found entities. + */ + Flux sampleScan(Long limit, Long... seed); + + /** + * Finds a list of documents based on the given IDs. + * + * @param limit + * @param seed + * @return the list of ids. + */ + Flux sampleScanIds(Long limit, Long... seed); + } + + /** + * Fluent method to specify options. + * + * @param the entity type to use for the results. + */ + interface RangeScanWithOptions extends TerminatingRangeScan, WithScanOptions { + /** + * Fluent method to specify options to use for execution + * + * @param options options to use for execution + */ + @Override + TerminatingRangeScan withOptions(ScanOptions options); + } + + /** + * Fluent method to specify the collection. + * + * @param the entity type to use for the results. + */ + interface RangeScanInCollection extends RangeScanWithOptions, InCollection { + /** + * With a different collection + * + * @param collection the collection to use. + */ + @Override + RangeScanWithOptions inCollection(String collection); + } + + /** + * Fluent method to specify the scope. + * + * @param the entity type to use for the results. + */ + interface RangeScanInScope extends RangeScanInCollection, InScope { + /** + * With a different scope + * + * @param scope the scope to use. + */ + @Override + RangeScanInCollection inScope(String scope); + } + + interface RangeScanWithSort extends RangeScanInScope, WithScanSort { + /** + * sort + * + * @param sort + */ + @Override + RangeScanInScope withSort(Object sort); + } + + /** + * Fluent method to specify scan consistency. Scan consistency may also come from an annotation. + * + * @param the entity type to use for the results. + */ + interface RangeScanConsistentWith extends RangeScanWithSort, ConsistentWith { + + /** + * Allows to override the default scan consistency. + * + * @param mutationState the custom scan consistency to use for this query. + */ + @Override + RangeScanWithSort consistentWith(MutationState mutationState); + } + + /** + * Fluent method to specify a return type different than the the entity type to use for the results. + * + * @param the entity type to use for the results. + */ + interface RangeScanWithProjection extends RangeScanConsistentWith { + + /** + * Define the target type fields should be mapped to.
+ * Skip this step if you are only interested in the original the entity type to use for the results. + * + * @param returnType must not be {@literal null}. + * @return new instance of {@link ReactiveFindByQueryOperation.FindByQueryWithProjection}. + * @throws IllegalArgumentException if returnType is {@literal null}. + */ + RangeScanConsistentWith as(Class returnType); + } + + interface RangeScanWithBatchItemLimit extends RangeScanWithProjection, WithBatchItemLimit { + + /** + * determines if result are just ids or ids plus contents + * + * @param batchByteLimit must not be {@literal null}. + * @return new instance of {@link RangeScanWithProjection}. + * @throws IllegalArgumentException if returnType is {@literal null}. + */ + @Override + RangeScanWithProjection withBatchItemLimit(Integer batchByteLimit); + } + + interface RangeScanWithBatchByteLimit extends RangeScanWithBatchItemLimit, WithBatchByteLimit { + + /** + * determines if result are just ids or ids plus contents + * + * @param batchByteLimit must not be {@literal null}. + * @return new instance of {@link RangeScanWithProjection}. + * @throws IllegalArgumentException if returnType is {@literal null}. + */ + @Override + RangeScanWithBatchItemLimit withBatchByteLimit(Integer batchByteLimit); + } + + /** + * Provides methods for constructing query operations in a fluent way. + * + * @param the entity type to use for the results + */ + interface ReactiveRangeScan extends RangeScanWithBatchByteLimit {} + +} diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveRangeScanOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveRangeScanOperationSupport.java new file mode 100644 index 000000000..1a13a824b --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveRangeScanOperationSupport.java @@ -0,0 +1,230 @@ +/* + * Copyright 2012-2023 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.couchbase.core; + +import reactor.core.publisher.Flux; + +import java.nio.charset.StandardCharsets; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.couchbase.core.query.OptionsBuilder; +import org.springframework.data.couchbase.core.support.PseudoArgs; +import org.springframework.util.Assert; + +import com.couchbase.client.java.ReactiveCollection; +import com.couchbase.client.java.kv.MutationState; +import com.couchbase.client.java.kv.ScanOptions; +import com.couchbase.client.java.kv.ScanTerm; +import com.couchbase.client.java.kv.ScanType; + +public class ReactiveRangeScanOperationSupport implements ReactiveRangeScanOperation { + + private final ReactiveCouchbaseTemplate template; + private static final Logger LOG = LoggerFactory.getLogger(ReactiveRangeScanOperationSupport.class); + + ReactiveRangeScanOperationSupport(ReactiveCouchbaseTemplate template) { + this.template = template; + } + + @Override + public ReactiveRangeScan rangeScan(Class domainType) { + return new ReactiveRangeScanSupport<>(template, domainType, OptionsBuilder.getScopeFrom(domainType), + OptionsBuilder.getCollectionFrom(domainType), null, null, null, null, null, + template.support()); + } + + static class ReactiveRangeScanSupport implements ReactiveRangeScan { + + private final ReactiveCouchbaseTemplate template; + private final Class domainType; + private final String scope; + private final String collection; + private final ScanOptions options; + private final Object sort; + private final MutationState mutationState; + private final Integer batchItemLimit; + private final Integer batchByteLimit; + private final ReactiveTemplateSupport support; + + ReactiveRangeScanSupport(ReactiveCouchbaseTemplate template, Class domainType, String scope, String collection, + ScanOptions options, Object sort, MutationState mutationState, + Integer batchItemLimit, Integer batchByteLimit, ReactiveTemplateSupport support) { + this.template = template; + this.domainType = domainType; + this.scope = scope; + this.collection = collection; + this.options = options; + this.sort = sort; + this.mutationState = mutationState; + this.batchItemLimit = batchItemLimit; + this.batchByteLimit = batchByteLimit; + this.support = support; + } + + @Override + public TerminatingRangeScan withOptions(final ScanOptions options) { + Assert.notNull(options, "Options must not be null."); + return new ReactiveRangeScanSupport<>(template, domainType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit, support); + } + + @Override + public RangeScanWithOptions inCollection(final String collection) { + return new ReactiveRangeScanSupport<>(template, domainType, scope, + collection != null ? collection : this.collection, options, sort, mutationState, + batchItemLimit, batchByteLimit, support); + } + + @Override + public RangeScanInCollection inScope(final String scope) { + return new ReactiveRangeScanSupport<>(template, domainType, scope != null ? scope : this.scope, collection, + options, sort, mutationState, batchItemLimit, batchByteLimit, support); + } + + @Override + public RangeScanInScope withSort(Object sort) { + return new ReactiveRangeScanSupport<>(template, domainType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit, support); + } + + @Override + public RangeScanWithSort consistentWith(MutationState mutationState) { + return new ReactiveRangeScanSupport<>(template, domainType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit, support); + } + + @Override + public RangeScanConsistentWith as(Class returnType) { + return new ReactiveRangeScanSupport<>(template, returnType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit, support); + } + + @Override + public RangeScanWithProjection withBatchItemLimit(Integer batchItemLimit) { + return new ReactiveRangeScanSupport<>(template, domainType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit, support); + } + + @Override + public RangeScanWithBatchItemLimit withBatchByteLimit(Integer batchByteLimit) { + return new ReactiveRangeScanSupport<>(template, domainType, scope, collection, options, sort, + mutationState, batchItemLimit, batchByteLimit, support); + } + + @Override + public Flux rangeScan(String lower, String upper) { + return rangeScan(lower, upper, false, null, null); + } + + @Override + public Flux sampleScan(Long limit, Long... seed) { + return rangeScan(null, null, true, limit, seed!= null && seed.length > 0 ? seed[0] : null); + } + + + Flux rangeScan(String lower, String upper, boolean isSamplingScan, Long limit, Long seed) { + + PseudoArgs pArgs = new PseudoArgs<>(template, scope, collection, options, domainType); + if (LOG.isDebugEnabled()) { + LOG.debug("rangeScan lower={} upper={} {}", lower, upper, pArgs); + } + ReactiveCollection rc = template.getCouchbaseClientFactory().withScope(pArgs.getScope()) + .getCollection(pArgs.getCollection()).reactive(); + + ScanType scanType = null; + if(isSamplingScan){ + scanType = ScanType.samplingScan(limit, seed != null ? seed : 0); + } else { + ScanTerm lowerTerm = ScanTerm.minimum(); + ScanTerm upperTerm = ScanTerm.maximum(); + if (lower != null) { + lowerTerm = ScanTerm.inclusive(lower); + } + if (upper != null) { + upperTerm = ScanTerm.inclusive(upper); + } + scanType = ScanType.rangeScan(lowerTerm, upperTerm); + } + + Flux reactiveEntities = TransactionalSupport.verifyNotInTransaction("rangeScan") + .thenMany(rc.scan(scanType, buildScanOptions(pArgs.getOptions(), false)) + .flatMap(result -> support.decodeEntity(result.id(), + new String(result.contentAsBytes(), StandardCharsets.UTF_8), result.cas(), domainType, + pArgs.getScope(), pArgs.getCollection(), null, null))); + + return reactiveEntities.onErrorMap(throwable -> { + if (throwable instanceof RuntimeException) { + return template.potentiallyConvertRuntimeException((RuntimeException) throwable); + } else { + return throwable; + } + }); + + } + + @Override + public Flux rangeScanIds(String upper, String lower) { + return rangeScanIds(upper, lower, false, null, null); + } + + @Override + public Flux sampleScanIds(Long limit, Long... seed) { + return rangeScanIds(null, null, true, limit, seed!= null && seed.length > 0 ? seed[0] : null); + } + + Flux rangeScanIds(String lower, String upper, boolean isSamplingScan, Long limit, Long seed) { + PseudoArgs pArgs = new PseudoArgs<>(template, scope, collection, options, domainType); + if (LOG.isDebugEnabled()) { + LOG.debug("rangeScan lower={} upper={} {}", lower, upper, pArgs); + } + ReactiveCollection rc = template.getCouchbaseClientFactory().withScope(pArgs.getScope()) + .getCollection(pArgs.getCollection()).reactive(); + + ScanType scanType = null; + if(isSamplingScan){ + scanType = ScanType.samplingScan(limit, seed != null ? seed : 0); + } else { + ScanTerm lowerTerm = ScanTerm.minimum(); + ScanTerm upperTerm = ScanTerm.maximum(); + if (lower != null) { + lowerTerm = ScanTerm.inclusive(lower); + } + if (upper != null) { + upperTerm = ScanTerm.inclusive(upper); + } + scanType = ScanType.rangeScan(lowerTerm, upperTerm); + } + + Flux reactiveEntities = TransactionalSupport.verifyNotInTransaction("rangeScanIds") + .thenMany(rc.scan(scanType, buildScanOptions(pArgs.getOptions(), true)).map(result -> result.id())); + + return reactiveEntities.onErrorMap(throwable -> { + if (throwable instanceof RuntimeException) { + return template.potentiallyConvertRuntimeException((RuntimeException) throwable); + } else { + return throwable; + } + }); + + } + + private ScanOptions buildScanOptions(ScanOptions options, Boolean idsOnly) { + return OptionsBuilder.buildScanOptions(options, sort, idsOnly, mutationState, batchByteLimit, batchItemLimit); + } + } + +} diff --git a/src/main/java/org/springframework/data/couchbase/core/query/OptionsBuilder.java b/src/main/java/org/springframework/data/couchbase/core/query/OptionsBuilder.java index e7f297bc2..58923a031 100644 --- a/src/main/java/org/springframework/data/couchbase/core/query/OptionsBuilder.java +++ b/src/main/java/org/springframework/data/couchbase/core/query/OptionsBuilder.java @@ -15,7 +15,6 @@ */ package org.springframework.data.couchbase.core.query; -import static com.couchbase.client.core.util.Validators.notNull; import static org.springframework.data.couchbase.core.query.Meta.MetaKey.RETRY_STRATEGY; import static org.springframework.data.couchbase.core.query.Meta.MetaKey.SCAN_CONSISTENCY; import static org.springframework.data.couchbase.core.query.Meta.MetaKey.TIMEOUT; @@ -25,13 +24,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.time.Duration; -import java.util.HashMap; import java.util.Map; import java.util.Optional; -import com.couchbase.client.core.api.query.CoreQueryScanConsistency; -import com.couchbase.client.core.classic.query.ClassicCoreQueryOps; -import com.couchbase.client.core.error.InvalidArgumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.AnnotatedElementUtils; @@ -44,6 +39,9 @@ import org.springframework.data.couchbase.repository.Scope; import org.springframework.data.couchbase.repository.query.CouchbaseQueryMethod; +import com.couchbase.client.core.api.query.CoreQueryScanConsistency; +import com.couchbase.client.core.classic.query.ClassicCoreQueryOps; +import com.couchbase.client.core.error.InvalidArgumentException; import com.couchbase.client.core.io.CollectionIdentifier; import com.couchbase.client.core.msg.kv.DurabilityLevel; import com.couchbase.client.core.retry.RetryStrategy; @@ -51,11 +49,13 @@ import com.couchbase.client.java.json.JsonObject; import com.couchbase.client.java.kv.ExistsOptions; import com.couchbase.client.java.kv.InsertOptions; -import com.couchbase.client.java.kv.PersistTo; import com.couchbase.client.java.kv.MutateInOptions; +import com.couchbase.client.java.kv.MutationState; +import com.couchbase.client.java.kv.PersistTo; import com.couchbase.client.java.kv.RemoveOptions; import com.couchbase.client.java.kv.ReplaceOptions; import com.couchbase.client.java.kv.ReplicateTo; +import com.couchbase.client.java.kv.ScanOptions; import com.couchbase.client.java.kv.UpsertOptions; import com.couchbase.client.java.query.QueryOptions; import com.couchbase.client.java.query.QueryScanConsistency; @@ -545,4 +545,24 @@ public static String annotationString(Class annotation return annotationString(annotation, "value", defaultValue, elements); } + public static ScanOptions buildScanOptions(ScanOptions options, Object sort, Boolean idsOnly, + MutationState mutationState, Integer batchByteLimit, Integer batchItemLimit) { + options = options != null ? options : ScanOptions.scanOptions(); + if (sort != null) { + //options.sort(sort); + } + if (idsOnly != null) { + options.idsOnly(idsOnly); + } + if (mutationState != null) { + options.consistentWith(mutationState); + } + if (batchByteLimit != null) { + options.batchByteLimit(batchByteLimit); + } + if (batchItemLimit != null) { + options.batchItemLimit(batchItemLimit); + } + return options; + } } diff --git a/src/main/java/org/springframework/data/couchbase/core/support/ConsistentWith.java b/src/main/java/org/springframework/data/couchbase/core/support/ConsistentWith.java new file mode 100644 index 000000000..715c13ac9 --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/core/support/ConsistentWith.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020-2023 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.couchbase.core.support; + +import com.couchbase.client.java.kv.MutationState; + +/** + * A common interface for those that support withOptions() + * + * @author Michael Reiche + * @param - the entity class + */ +public interface ConsistentWith { + Object consistentWith(MutationState mutationState); + +} diff --git a/src/main/java/org/springframework/data/couchbase/core/support/WithBatchByteLimit.java b/src/main/java/org/springframework/data/couchbase/core/support/WithBatchByteLimit.java new file mode 100644 index 000000000..d197f994a --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/core/support/WithBatchByteLimit.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-2023 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.couchbase.core.support; + +/** + * A common interface for those that support withBatchByteLimit() + * + * @author Michael Reiche + * @param - the entity class + */ +public interface WithBatchByteLimit { + Object withBatchByteLimit(Integer batchByteLimit); +} diff --git a/src/main/java/org/springframework/data/couchbase/core/support/WithBatchItemLimit.java b/src/main/java/org/springframework/data/couchbase/core/support/WithBatchItemLimit.java new file mode 100644 index 000000000..a81661f68 --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/core/support/WithBatchItemLimit.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-2023 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.couchbase.core.support; + +/** + * A common interface for those that support withBatchItemLimit() + * + * @author Michael Reiche + * @param - the entity class + */ +public interface WithBatchItemLimit { + Object withBatchItemLimit(Integer batchItemLimit); +} diff --git a/src/main/java/org/springframework/data/couchbase/core/support/WithScanOptions.java b/src/main/java/org/springframework/data/couchbase/core/support/WithScanOptions.java new file mode 100644 index 000000000..4a69360e1 --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/core/support/WithScanOptions.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020-2023 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.couchbase.core.support; + +import com.couchbase.client.java.kv.ScanOptions; + +/** + * A common interface for those that support withOptions() + * + * @author Michael Reiche + * @param - the entity class + */ +public interface WithScanOptions { + Object withOptions(ScanOptions expiry); +} diff --git a/src/main/java/org/springframework/data/couchbase/core/support/WithScanSort.java b/src/main/java/org/springframework/data/couchbase/core/support/WithScanSort.java new file mode 100644 index 000000000..5fe70e2a7 --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/core/support/WithScanSort.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-2023 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.couchbase.core.support; + +/** + * A common interface for those that support withOptions() + * + * @author Michael Reiche + * @param - the entity class + */ +public interface WithScanSort { + Object withSort(Object expiry); +} diff --git a/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java index e7915bba5..40d1cf2c4 100644 --- a/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/core/CouchbaseTemplateKeyValueIntegrationTests.java @@ -27,11 +27,16 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.time.Duration; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Stream; -import com.couchbase.client.core.error.TimeoutException; -import com.couchbase.client.core.msg.kv.DurabilityLevel; -import com.couchbase.client.core.retry.RetryReason; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -41,8 +46,26 @@ import org.springframework.data.couchbase.core.ExecutableFindByIdOperation.ExecutableFindById; import org.springframework.data.couchbase.core.ExecutableRemoveByIdOperation.ExecutableRemoveById; import org.springframework.data.couchbase.core.ExecutableReplaceByIdOperation.ExecutableReplaceById; -import org.springframework.data.couchbase.core.support.*; -import org.springframework.data.couchbase.domain.*; +import org.springframework.data.couchbase.core.support.OneAndAllEntity; +import org.springframework.data.couchbase.core.support.OneAndAllId; +import org.springframework.data.couchbase.core.support.WithDurability; +import org.springframework.data.couchbase.core.support.WithExpiry; +import org.springframework.data.couchbase.domain.Address; +import org.springframework.data.couchbase.domain.Config; +import org.springframework.data.couchbase.domain.MutableUser; +import org.springframework.data.couchbase.domain.NaiveAuditorAware; +import org.springframework.data.couchbase.domain.PersonValue; +import org.springframework.data.couchbase.domain.Submission; +import org.springframework.data.couchbase.domain.User; +import org.springframework.data.couchbase.domain.UserAnnotated; +import org.springframework.data.couchbase.domain.UserAnnotated2; +import org.springframework.data.couchbase.domain.UserAnnotated3; +import org.springframework.data.couchbase.domain.UserAnnotatedDurability; +import org.springframework.data.couchbase.domain.UserAnnotatedDurabilityExpression; +import org.springframework.data.couchbase.domain.UserAnnotatedPersistTo; +import org.springframework.data.couchbase.domain.UserAnnotatedReplicateTo; +import org.springframework.data.couchbase.domain.UserAnnotatedTouchOnRead; +import org.springframework.data.couchbase.domain.UserSubmission; import org.springframework.data.couchbase.util.ClusterType; import org.springframework.data.couchbase.util.IgnoreWhen; import org.springframework.data.couchbase.util.JavaIntegrationTests; @@ -50,6 +73,12 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import com.couchbase.client.core.error.CouchbaseException; +import com.couchbase.client.core.error.TimeoutException; +import com.couchbase.client.core.msg.kv.DurabilityLevel; +import com.couchbase.client.core.msg.kv.MutationToken; +import com.couchbase.client.core.retry.RetryReason; +import com.couchbase.client.java.json.JsonObject; +import com.couchbase.client.java.kv.MutationState; import com.couchbase.client.java.kv.PersistTo; import com.couchbase.client.java.kv.ReplicateTo; import com.couchbase.client.java.query.QueryOptions; @@ -1051,11 +1080,6 @@ void insertById() { assertEquals(user, inserted); User found = couchbaseTemplate.findById(User.class).one(user.getId()); assertEquals(inserted, found); - System.err.println("inserted: "+inserted); - System.err.println("found: "+found); - System.err.println("found:jsonNode "+found.jsonNode.toPrettyString()); - System.err.println("found:jsonObject "+found.jsonObject.toString()); - System.err.println("found:jsonArray "+found.jsonArray.toString()); assertThrows(DuplicateKeyException.class, () -> couchbaseTemplate.insertById(User.class).one(user)); couchbaseTemplate.removeById(User.class).one(user.getId()); } @@ -1209,6 +1233,111 @@ void saveAndFindImmutableById() { couchbaseTemplate.removeById(PersonValue.class).one(replaced.getId()); } + @Test + void rangeScan() { + String id = "A"; + String lower = null; + String upper = null; + for (int i = 0; i < 10; i++) { + if (lower == null) { + lower = "" + i; + } + User inserted = couchbaseTemplate.insertById(User.class).one(new User("" + i, "fn_" + i, "ln_" + i)); + upper = "" + i; + } + MutationToken mt = couchbaseTemplate.getCouchbaseClientFactory().getDefaultCollection() + .upsert(id, JsonObject.create().put("id", id)).mutationToken().get(); + Stream users = couchbaseTemplate.rangeScan(User.class).consistentWith(MutationState.from(mt)) + /*.withSort(ScanSort.ASCENDING)*/.rangeScan(lower, upper); + for (User u : users.toList()) { + System.err.print(u); + System.err.println(","); + assertTrue(u.getId().compareTo(lower) >= 0 && u.getId().compareTo(upper) <= 0); + couchbaseTemplate.removeById(User.class).one(u.getId()); + } + couchbaseTemplate.getCouchbaseClientFactory().getDefaultCollection().remove(id); + } + + @Test + void rangeScanId() { + String id = "A"; + String lower = null; + String upper = null; + for (int i = 0; i < 10; i++) { + if (lower == null) { + lower = "" + i; + } + User inserted = couchbaseTemplate.insertById(User.class).one(new User("" + i, "fn_" + i, "ln_" + i)); + upper = "" + i; + } + MutationToken mt = couchbaseTemplate.getCouchbaseClientFactory().getDefaultCollection() + .upsert(id, JsonObject.create().put("id", id)).mutationToken().get(); + Stream userIds = couchbaseTemplate.rangeScan(User.class).consistentWith(MutationState.from(mt)) + /*.withSort(ScanSort.ASCENDING)*/.rangeScanIds(lower, upper); + for (String userId : userIds.toList()) { + System.err.print(userId); + System.err.println(","); + assertTrue(userId.compareTo(lower) >= 0 && userId.compareTo(upper) <= 0); + couchbaseTemplate.removeById(User.class).one(userId); + } + couchbaseTemplate.getCouchbaseClientFactory().getDefaultCollection().remove(id); + + } + + @Test + void sampleScan() { + String id = "A"; + String lower = null; + String upper = null; + for (int i = 0; i < 10; i++) { + if (lower == null) { + lower = "" + i; + } + User inserted = couchbaseTemplate.insertById(User.class).one(new User("" + i, "fn_" + i, "ln_" + i)); + upper = "" + i; + } + MutationToken mt = couchbaseTemplate.getCouchbaseClientFactory().getDefaultCollection() + .upsert(id, JsonObject.create().put("id", id)).mutationToken().get(); + Stream users = couchbaseTemplate.rangeScan(User.class).consistentWith(MutationState.from(mt)) + /*.withSort(ScanSort.ASCENDING)*/.samplingScan(5l, null); + List usersList = users.toList(); + assertEquals(5, usersList.size(), "number in sample"); + for (User u : usersList) { + System.err.print(u); + System.err.println(","); + // assertTrue(u.getId().compareTo(lower) >= 0 && u.getId().compareTo(upper) <= 0); + //couchbaseTemplate.removeById(User.class).one(u.getId()); + } + couchbaseTemplate.getCouchbaseClientFactory().getDefaultCollection().remove(id); + } + + @Test + void sampleScanId() { + String id = "A"; + String lower = null; + String upper = null; + for (int i = 0; i < 10; i++) { + if (lower == null) { + lower = "" + i; + } + User inserted = couchbaseTemplate.insertById(User.class).one(new User("" + i, "fn_" + i, "ln_" + i)); + upper = "" + i; + } + MutationToken mt = couchbaseTemplate.getCouchbaseClientFactory().getDefaultCollection() + .upsert(id, JsonObject.create().put("id", id)).mutationToken().get(); + Stream userIds = couchbaseTemplate.rangeScan(User.class).consistentWith(MutationState.from(mt)) + /*.withSort(ScanSort.ASCENDING)*/.samplingScanIds(5l); + for (String userId : userIds.toList()) { + System.err.print(userId); + System.err.println(","); + //assertTrue(userId.compareTo(lower) >= 0 && userId.compareTo(upper) <= 0); + //couchbaseTemplate.removeById(User.class).one(userId); + } + couchbaseTemplate.getCouchbaseClientFactory().getDefaultCollection().remove(id); + + } + + private void sleepSecs(int i) { try { Thread.sleep(i * 1000);