Skip to content

Datacouch 1599 add rangescan support #1738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ public <T> ExecutableRemoveByQuery<T> removeByQuery(Class<T> domainType) {
return new ExecutableRemoveByQueryOperationSupport(this).removeByQuery(domainType);
}

@Override
public <T> ExecutableRangeScan<T> rangeScan(Class<T> domainType) {
return new ExecutableRangeScanOperationSupport(this).rangeScan(domainType);
}

@Override
public String getBucketName() {
return clientFactory.getBucket().name();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright 2012-2023 the original author or authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.couchbase.core;

import java.util.stream.Stream;

import org.springframework.data.couchbase.core.support.ConsistentWith;
import org.springframework.data.couchbase.core.support.InCollection;
import org.springframework.data.couchbase.core.support.InScope;
import org.springframework.data.couchbase.core.support.WithBatchByteLimit;
import org.springframework.data.couchbase.core.support.WithBatchItemLimit;
import org.springframework.data.couchbase.core.support.WithScanOptions;
import org.springframework.data.couchbase.core.support.WithScanSort;

import com.couchbase.client.java.kv.MutationState;
import com.couchbase.client.java.kv.ScanOptions;

/**
* Get Operations
*
* @author Michael Reiche
*/
public interface ExecutableRangeScanOperation {

/**
* Loads a document from a bucket.
*
* @param domainType the entity type to use for the results.
*/
<T> ExecutableRangeScan<T> rangeScan(Class<T> domainType);

/**
* Terminating operations invoking the actual execution.
*
* @param <T> the entity type to use for the results.
*/
interface TerminatingRangeScan<T> /*extends OneAndAllId<T>*/ {

/**
* Range Scan
*
* @param upper
* @param lower
* @return the list of found entities.
*/
Stream<T> rangeScan(String lower, String upper);

/**
* Range Scan Ids
*
* @param upper
* @param lower
* @return the list of found keys.
*/
Stream<String> rangeScanIds(String lower, String upper);

/**
* Range Scan
*
* @param limit
* @param seed
* @return the list of found entities.
*/
Stream<T> samplingScan(Long limit, Long... seed);

/**
* Range Scan Ids
*
* @param limit
* @param seed
* @return the list of keys
*/
Stream<String> samplingScanIds(Long limit, Long... seed);
}

/**
* Fluent method to specify options.
*
* @param <T> the entity type to use for the results.
*/
interface RangeScanWithOptions<T> extends TerminatingRangeScan<T>, WithScanOptions<T> {
/**
* Fluent method to specify options to use for execution
*
* @param options options to use for execution
*/
@Override
TerminatingRangeScan<T> withOptions(ScanOptions options);
}

/**
* Fluent method to specify the collection.
*
* @param <T> the entity type to use for the results.
*/
interface RangeScanInCollection<T> extends RangeScanWithOptions<T>, InCollection<T> {
/**
* With a different collection
*
* @param collection the collection to use.
*/
@Override
RangeScanWithOptions<T> inCollection(String collection);
}

/**
* Fluent method to specify the scope.
*
* @param <T> the entity type to use for the results.
*/
interface RangeScanInScope<T> extends RangeScanInCollection<T>, InScope<T> {
/**
* With a different scope
*
* @param scope the scope to use.
*/
@Override
RangeScanInCollection<T> inScope(String scope);
}



interface RangeScanWithSort<T> extends RangeScanInScope<T>, WithScanSort<T> {
/**
* sort
*
* @param sort
*/
@Override
RangeScanInScope<T> withSort(Object sort);
}

/**
* Fluent method to specify scan consistency. Scan consistency may also come from an annotation.
*
* @param <T> the entity type to use for the results.
*/
interface RangeScanConsistentWith<T> extends RangeScanWithSort<T>, ConsistentWith<T> {

/**
* Allows to override the default scan consistency.
*
* @param mutationState the custom scan consistency to use for this query.
*/
@Override
RangeScanWithSort<T> consistentWith(MutationState mutationState);
}

/**
* Fluent method to specify a return type different than the the entity type to use for the results.
*
* @param <T> the entity type to use for the results.
*/
interface RangeScanWithProjection<T> extends RangeScanConsistentWith<T> {

/**
* Define the target type fields should be mapped to. <br />
* Skip this step if you are only interested in the original the entity type to use for the results.
*
* @param returnType must not be {@literal null}.
* @return new instance of {@link ExecutableFindByQueryOperation.FindByQueryWithProjection}.
* @throws IllegalArgumentException if returnType is {@literal null}.
*/
<R> RangeScanConsistentWith<R> as(Class<R> returnType);
}

interface RangeScanWithBatchItemLimit<T> extends RangeScanWithProjection<T>, WithBatchItemLimit<T> {

/**
* determines if result are just ids or ids plus contents
*
* @param batchByteLimit must not be {@literal null}.
* @return new instance of {@link RangeScanWithProjection}.
* @throws IllegalArgumentException if returnType is {@literal null}.
*/
@Override
RangeScanWithProjection<T> withBatchItemLimit(Integer batchByteLimit);
}

interface RangeScanWithBatchByteLimit<T> extends RangeScanWithBatchItemLimit<T>, WithBatchByteLimit<T> {

/**
* determines if result are just ids or ids plus contents
*
* @param batchByteLimit must not be {@literal null}.
* @return new instance of {@link RangeScanWithProjection}.
* @throws IllegalArgumentException if returnType is {@literal null}.
*/
@Override
RangeScanWithBatchItemLimit<T> withBatchByteLimit(Integer batchByteLimit);
}

/**
* Provides methods for constructing query operations in a fluent way.
*
* @param <T> the entity type to use for the results
*/
interface ExecutableRangeScan<T> extends RangeScanWithBatchByteLimit<T> {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2012-2023 the original author or authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.couchbase.core;

import java.util.stream.Stream;

import org.springframework.data.couchbase.core.ReactiveRangeScanOperationSupport.ReactiveRangeScanSupport;
import org.springframework.data.couchbase.core.query.OptionsBuilder;
import org.springframework.util.Assert;

import com.couchbase.client.java.kv.MutationState;
import com.couchbase.client.java.kv.ScanOptions;

public class ExecutableRangeScanOperationSupport implements ExecutableRangeScanOperation {

private final CouchbaseTemplate template;

ExecutableRangeScanOperationSupport(CouchbaseTemplate template) {
this.template = template;
}

@Override
public <T> ExecutableRangeScan<T> rangeScan(Class<T> domainType) {
return new ExecutableRangeScanSupport<>(template, domainType, OptionsBuilder.getScopeFrom(domainType),
OptionsBuilder.getCollectionFrom(domainType), null, null, null, null, null);
}

static class ExecutableRangeScanSupport<T> implements ExecutableRangeScan<T> {

private final CouchbaseTemplate template;
private final Class<T> domainType;
private final String scope;
private final String collection;
private final ScanOptions options;
private final Object sort;
private final MutationState mutationState;
private final Integer batchItemLimit;
private final Integer batchByteLimit;
private final ReactiveRangeScanSupport<T> reactiveSupport;

ExecutableRangeScanSupport(CouchbaseTemplate template, Class<T> domainType, String scope, String collection,
ScanOptions options, Object sort, MutationState mutationState,
Integer batchItemLimit, Integer batchByteLimit) {
this.template = template;
this.domainType = domainType;
this.scope = scope;
this.collection = collection;
this.options = options;
this.sort = sort;
this.mutationState = mutationState;
this.batchItemLimit = batchItemLimit;
this.batchByteLimit = batchByteLimit;
this.reactiveSupport = new ReactiveRangeScanSupport<>(template.reactive(), domainType, scope, collection, options,
sort, mutationState, batchItemLimit, batchByteLimit,
new NonReactiveSupportWrapper(template.support()));
}

@Override
public TerminatingRangeScan<T> withOptions(final ScanOptions options) {
Assert.notNull(options, "Options must not be null.");
return new ExecutableRangeScanSupport<>(template, domainType, scope, collection, options, sort,
mutationState, batchItemLimit, batchByteLimit);
}

@Override
public RangeScanWithOptions<T> inCollection(final String collection) {
return new ExecutableRangeScanSupport<>(template, domainType, scope,
collection != null ? collection : this.collection, options, sort, mutationState,
batchItemLimit, batchByteLimit);
}

@Override
public RangeScanInCollection<T> inScope(final String scope) {
return new ExecutableRangeScanSupport<>(template, domainType, scope != null ? scope : this.scope, collection,
options, sort, mutationState, batchItemLimit, batchByteLimit);
}

@Override
public RangeScanInScope<T> withSort(Object sort) {
return new ExecutableRangeScanSupport<>(template, domainType, scope, collection, options, sort,
mutationState, batchItemLimit, batchByteLimit);
}

@Override
public RangeScanWithSort<T> consistentWith(MutationState mutationState) {
return new ExecutableRangeScanSupport<>(template, domainType, scope, collection, options, sort,
mutationState, batchItemLimit, batchByteLimit);
}

@Override
public <R> RangeScanConsistentWith<R> as(Class<R> returnType) {
return new ExecutableRangeScanSupport<>(template, returnType, scope, collection, options, sort,
mutationState, batchItemLimit, batchByteLimit);
}

@Override
public RangeScanWithProjection<T> withBatchItemLimit(Integer batchItemLimit) {
return new ExecutableRangeScanSupport<>(template, domainType, scope, collection, options, sort,
mutationState, batchItemLimit, batchByteLimit);
}

@Override
public RangeScanWithBatchItemLimit<T> withBatchByteLimit(Integer batchByteLimit) {
return new ExecutableRangeScanSupport<>(template, domainType, scope, collection, options, sort,
mutationState, batchItemLimit, batchByteLimit);
}

@Override
public Stream<T> rangeScan(String lower, String upper) {
return reactiveSupport.rangeScan(lower, upper, false, null, null).toStream();
}

@Override
public Stream<String> rangeScanIds(String lower, String upper) {
return reactiveSupport.rangeScanIds(lower, upper, false, null, null).toStream();
}

@Override
public Stream<T> samplingScan(Long limit, Long... seed) {
return reactiveSupport.sampleScan(limit, seed).toStream();
}

@Override
public Stream<String> samplingScanIds(Long limit, Long... seed) {
return reactiveSupport.sampleScanIds(limit, seed).toStream();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@
public interface FluentCouchbaseOperations extends ExecutableUpsertByIdOperation, ExecutableInsertByIdOperation,
ExecutableReplaceByIdOperation, ExecutableFindByIdOperation, ExecutableFindFromReplicasByIdOperation,
ExecutableFindByQueryOperation, ExecutableFindByAnalyticsOperation, ExecutableExistsByIdOperation,
ExecutableRemoveByIdOperation, ExecutableRemoveByQueryOperation, ExecutableMutateInByIdOperation {}
ExecutableRemoveByIdOperation, ExecutableRemoveByQueryOperation, ExecutableMutateInByIdOperation,
ExecutableRangeScanOperation {}
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public <T> ReactiveMutateInById<T> mutateInById(Class<T> domainType) {
return new ReactiveMutateInByIdOperationSupport(this).mutateInById(domainType);
}

@Override
public <T> ReactiveRangeScan<T> rangeScan(Class<T> domainType) {
return new ReactiveRangeScanOperationSupport(this).rangeScan(domainType);
}

@Override
public String getBucketName() {
return clientFactory.getBucket().name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@
public interface ReactiveFluentCouchbaseOperations extends ReactiveUpsertByIdOperation, ReactiveInsertByIdOperation,
ReactiveReplaceByIdOperation, ReactiveFindByIdOperation, ReactiveExistsByIdOperation,
ReactiveFindByAnalyticsOperation, ReactiveFindFromReplicasByIdOperation, ReactiveFindByQueryOperation,
ReactiveRemoveByIdOperation, ReactiveRemoveByQueryOperation, ReactiveMutateInByIdOperation {}
ReactiveRemoveByIdOperation, ReactiveRemoveByQueryOperation, ReactiveMutateInByIdOperation,
ReactiveRangeScanOperation {}
Loading