diff --git a/src/main/java/com/couchbase/client/java/Cluster.java b/src/main/java/com/couchbase/client/java/Cluster.java
deleted file mode 100644
index 23c588033..000000000
--- a/src/main/java/com/couchbase/client/java/Cluster.java
+++ /dev/null
@@ -1,589 +0,0 @@
-/*
- * Copyright (c) 2018 Couchbase, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.couchbase.client.java;
-
-import com.couchbase.client.core.Core;
-import com.couchbase.client.core.diagnostics.ClusterState;
-import com.couchbase.client.core.diagnostics.DiagnosticsResult;
-import com.couchbase.client.core.annotation.Stability;
-import com.couchbase.client.core.diagnostics.PingResult;
-import com.couchbase.client.core.env.Authenticator;
-import com.couchbase.client.core.env.PasswordAuthenticator;
-import com.couchbase.client.core.env.SeedNode;
-import com.couchbase.client.core.error.CouchbaseException;
-import com.couchbase.client.core.error.TimeoutException;
-import com.couchbase.client.core.msg.search.SearchRequest;
-import com.couchbase.client.java.analytics.AnalyticsOptions;
-import com.couchbase.client.java.analytics.AnalyticsResult;
-import com.couchbase.client.java.diagnostics.DiagnosticsOptions;
-import com.couchbase.client.java.diagnostics.PingOptions;
-import com.couchbase.client.java.diagnostics.WaitUntilReadyOptions;
-import com.couchbase.client.java.env.ClusterEnvironment;
-import com.couchbase.client.java.manager.analytics.AnalyticsIndexManager;
-import com.couchbase.client.java.manager.bucket.BucketManager;
-import com.couchbase.client.java.manager.eventing.EventingFunctionManager;
-import com.couchbase.client.java.manager.query.QueryIndexManager;
-import com.couchbase.client.java.manager.search.SearchIndexManager;
-import com.couchbase.client.java.manager.user.UserManager;
-import com.couchbase.client.java.query.QueryOptions;
-import com.couchbase.client.java.query.QueryResult;
-import com.couchbase.client.java.search.SearchOptions;
-import com.couchbase.client.java.search.SearchQuery;
-import com.couchbase.client.java.search.result.SearchResult;
-import com.couchbase.client.java.transactions.Transactions;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Supplier;
-
-import static com.couchbase.client.core.util.Validators.notNull;
-import static com.couchbase.client.core.util.Validators.notNullOrEmpty;
-import static com.couchbase.client.java.AsyncCluster.extractClusterEnvironment;
-import static com.couchbase.client.java.AsyncCluster.seedNodesFromConnectionString;
-import static com.couchbase.client.java.AsyncUtils.block;
-import static com.couchbase.client.java.ClusterOptions.clusterOptions;
-import static com.couchbase.client.java.ReactiveCluster.DEFAULT_ANALYTICS_OPTIONS;
-import static com.couchbase.client.java.ReactiveCluster.DEFAULT_DIAGNOSTICS_OPTIONS;
-import static com.couchbase.client.java.ReactiveCluster.DEFAULT_QUERY_OPTIONS;
-import static com.couchbase.client.java.ReactiveCluster.DEFAULT_SEARCH_OPTIONS;
-
-/**
- * The {@link Cluster} is the main entry point when connecting to a Couchbase cluster.
- *
- * Most likely you want to start out by using the {@link #connect(String, String, String)} entry point. For more
- * advanced options you want to use the {@link #connect(String, ClusterOptions)} method. The entry point that allows
- * overriding the seed nodes ({@link #connect(Set, ClusterOptions)} is only needed if you run a couchbase cluster
- * at non-standard ports.
- *
- * See the individual connect methods for more information, but here is a snippet to get you off the ground quickly. It
- * assumes you have Couchbase running locally and the "travel-sample" sample bucket loaded:
- *
- * //Connect and open a bucket
- * Cluster cluster = Cluster.connect("127.0.0.1","Administrator","password");
- * Bucket bucket = cluster.bucket("travel-sample");
- * Collection collection = bucket.defaultCollection();
- *
- * // Perform a N1QL query
- * QueryResult queryResult = cluster.query("select * from `travel-sample` limit 5");
- * System.out.println(queryResult.rowsAsObject());
- *
- * // Perform a KV request and load a document
- * GetResult getResult = collection.get("airline_10");
- * System.out.println(getResult);
- *
- *
- * When the application shuts down (or the SDK is not needed anymore), you are required to call {@link #disconnect()}.
- * If you omit this step, the application will terminate (all spawned threads are daemon threads) but any operations
- * or work in-flight will not be able to complete and lead to undesired side-effects. Note that disconnect will also
- * shutdown all associated {@link Bucket buckets}.
- *
- * Cluster-level operations like {@link #query(String)} will not work unless at leas one bucket is opened against a
- * pre 6.5 cluster. If you are using 6.5 or later, you can run cluster-level queries without opening a bucket. All
- * of these operations are lazy, so the SDK will bootstrap in the background and service queries as quickly as possible.
- * This also means that the first operations might be a bit slower until all sockets are opened in the background and
- * the configuration is loaded. If you want to wait explicitly, you can utilize the {@link #waitUntilReady(Duration)}
- * method before performing your first query.
- *
- * The SDK will only work against Couchbase Server 5.0 and later, because RBAC (role-based access control) is a first
- * class concept since 3.0 and therefore required.
- */
-// todo gpx as per discussion with miker - if required, ClusterInterface will be added to the SDK instead
-public class Cluster implements ClusterInterface {
-
- /**
- * Holds the underlying async cluster reference.
- */
- private final AsyncCluster asyncCluster;
-
- /**
- * Holds the adjacent reactive cluster reference.
- */
- private final ReactiveCluster reactiveCluster;
-
- /**
- * The search index manager manages search indexes.
- */
- private final SearchIndexManager searchIndexManager;
-
- /**
- * The user manager manages users and groups.
- */
- private final UserManager userManager;
-
- /**
- * The bucket manager manages buckets and allows to flush them.
- */
- private final BucketManager bucketManager;
-
- /**
- * Allows to manage query indexes.
- */
- private final QueryIndexManager queryIndexManager;
-
- /**
- * Allows to manage analytics indexes.
- */
- private final AnalyticsIndexManager analyticsIndexManager;
-
- /**
- * Allows to manage eventing functions.
- */
- private final EventingFunctionManager eventingFunctionManager;
-
- /**
- * Stores already opened buckets for reuse.
- */
- private final Map bucketCache = new ConcurrentHashMap<>();
-
- /**
- * Connect to a Couchbase cluster with a username and a password as credentials.
- *
- * This is the simplest (and recommended) method to connect to the cluster if you do not need to provide any
- * custom options.
- *
- * The first argument (the connection string in its simplest form) is used to supply the hostnames of the cluster. In
- * development it is OK to only pass in one hostname (or IP address), but in production we recommend passing in at
- * least 3 nodes of the cluster (comma separated). The reason is that if one or more of the nodes are not reachable
- * the client will still be able to bootstrap (and your application will become more resilient as a result).
- *
- * Here is how you specify one node to use for bootstrapping:
- *
- * Cluster cluster = Cluster.connect("127.0.0.1", "user", "password"); // ok during development
- *
- * This is what we recommend in production:
- *
- * Cluster cluster = Cluster.connect("host1,host2,host3", "user", "password"); // recommended in production
- *
- * It is important to understand that the SDK will only use the bootstrap ("seed nodes") host list to establish an
- * initial contact with the cluster. Once the configuration is loaded this list is discarded and the client will
- * connect to all nodes based on this configuration.
- *
- * This method will return immediately and the SDK will try to establish all the necessary resources and connections
- * in the background. This means that depending on how fast it can be bootstrapped, the first couple cluster-level
- * operations like {@link #query(String)} will take a bit longer. If you want to wait explicitly until those resources
- * are available, you can use the {@link #waitUntilReady(Duration)} method before running any of them:
- *
- *
- * @param connectionString connection string used to locate the Couchbase cluster.
- * @param username the name of the user with appropriate permissions on the cluster.
- * @param password the password of the user with appropriate permissions on the cluster.
- * @return the instantiated {@link Cluster}.
- */
- public static Cluster connect(final String connectionString, final String username, final String password) {
- return connect(connectionString, clusterOptions(PasswordAuthenticator.create(username, password)));
- }
-
- /**
- * Connect to a Couchbase cluster with custom options.
- *
- * You likely want to use this over the simpler {@link #connect(String, String, String)} if:
- *
- *
A custom {@link ClusterEnvironment}
- *
Or a custom {@link Authenticator}
- *
- * needs to be provided.
- *
- * A custom environment can be passed in like this:
- *
- * It is VERY important to shut down the environment when being passed in separately (as shown in
- * the code sample above) and AFTER the cluster is disconnected. This will ensure an orderly shutdown
- * and makes sure that no resources are left lingering.
- *
- * If you want to pass in a custom {@link Authenticator}, it is likely because you are setting up certificate-based
- * authentication instead of using a username and a password directly. Remember to also enable TLS.
- *
- * This method will return immediately and the SDK will try to establish all the necessary resources and connections
- * in the background. This means that depending on how fast it can be bootstrapped, the first couple cluster-level
- * operations like {@link #query(String)} will take a bit longer. If you want to wait explicitly until those resources
- * are available, you can use the {@link #waitUntilReady(Duration)} method before running any of them:
- *
- *
- * @param connectionString connection string used to locate the Couchbase cluster.
- * @param options custom options when creating the cluster.
- * @return the instantiated {@link Cluster}.
- */
- public static Cluster connect(final String connectionString, final ClusterOptions options) {
- notNullOrEmpty(connectionString, "ConnectionString");
- notNull(options, "ClusterOptions");
-
- final ClusterOptions.Built opts = options.build();
- final Supplier environmentSupplier = extractClusterEnvironment(connectionString, opts);
- return new Cluster(
- environmentSupplier,
- opts.authenticator(),
- seedNodesFromConnectionString(connectionString, environmentSupplier.get())
- );
- }
-
- /**
- * Connect to a Couchbase cluster with a list of seed nodes and custom options.
- *
- * Note that you likely only want to use this method if you need to pass in custom ports for specific seed nodes
- * during bootstrap. Otherwise we recommend relying on the simpler {@link #connect(String, String, String)} method
- * instead.
- *
- * The following example shows how to bootstrap against a node with custom KV and management ports:
- *
- * @param seedNodes the seed nodes used to connect to the cluster.
- * @param options custom options when creating the cluster.
- * @return the instantiated {@link Cluster}.
- */
- public static Cluster connect(final Set seedNodes, final ClusterOptions options) {
- notNullOrEmpty(seedNodes, "SeedNodes");
- notNull(options, "ClusterOptions");
-
- final ClusterOptions.Built opts = options.build();
- return new Cluster(extractClusterEnvironment("", opts), opts.authenticator(), seedNodes);
- }
-
- /**
- * Creates a new cluster from a {@link ClusterEnvironment}.
- *
- * @param environment the environment to use.
- * @param authenticator the authenticator to use.
- * @param seedNodes the seed nodes to bootstrap from.
- */
- private Cluster(final Supplier environment, final Authenticator authenticator,
- final Set seedNodes) {
- this.asyncCluster = new AsyncCluster(environment, authenticator, seedNodes);
- this.reactiveCluster = new ReactiveCluster(asyncCluster);
- this.searchIndexManager = new SearchIndexManager(asyncCluster.searchIndexes());
- this.userManager = new UserManager(asyncCluster.users());
- this.bucketManager = new BucketManager(asyncCluster.buckets());
- this.queryIndexManager = new QueryIndexManager(asyncCluster.queryIndexes());
- this.analyticsIndexManager = new AnalyticsIndexManager(this);
- this.eventingFunctionManager = new EventingFunctionManager(asyncCluster.eventingFunctions());
- }
-
- /**
- * Provides access to the related {@link AsyncCluster}.
- *
- * Note that the {@link AsyncCluster} is considered advanced API and should only be used to get the last drop
- * of performance or if you are building higher-level abstractions on top. If in doubt, we recommend using the
- * {@link #reactive()} API instead.
- */
- public AsyncCluster async() {
- return asyncCluster;
- }
-
- /**
- * Provides access to the related {@link ReactiveCluster}.
- */
- public ReactiveCluster reactive() {
- return reactiveCluster;
- }
-
- /**
- * Provides access to the underlying {@link Core}.
- *
- *
This is advanced and volatile API - it might change any time without notice. Use with care!
- */
- @Stability.Volatile
- public Core core() {
- return asyncCluster.core();
- }
-
- /**
- * The user manager allows to manage users and groups.
- */
- public UserManager users() {
- return userManager;
- }
-
- /**
- * The bucket manager allows to perform administrative tasks on buckets and their resources.
- */
- public BucketManager buckets() {
- return bucketManager;
- }
-
- /**
- * The analytics index manager allows to modify and create indexes for the analytics service.
- */
- public AnalyticsIndexManager analyticsIndexes() {
- return analyticsIndexManager;
- }
-
- /**
- * The query index manager allows to modify and create indexes for the query service.
- */
- public QueryIndexManager queryIndexes() {
- return queryIndexManager;
- }
-
- /**
- * The search index manager allows to modify and create indexes for the search service.
- */
- public SearchIndexManager searchIndexes() {
- return searchIndexManager;
- }
-
- /**
- * Provides access to the eventing function management services.
- */
- @Stability.Uncommitted
- public EventingFunctionManager eventingFunctions() {
- return eventingFunctionManager;
- }
-
- /**
- * Provides access to the used {@link ClusterEnvironment}.
- */
- public ClusterEnvironment environment() {
- return asyncCluster.environment();
- }
-
- /**
- * Performs a query against the query (N1QL) services.
- *
- * @param statement the N1QL query statement.
- * @return the {@link QueryResult} once the response arrives successfully.
- * @throws TimeoutException if the operation times out before getting a result.
- * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
- */
- public QueryResult query(final String statement) {
- return query(statement, DEFAULT_QUERY_OPTIONS);
- }
-
- /**
- * Performs a query against the query (N1QL) services with custom options.
- *
- * @param statement the N1QL query statement as a raw string.
- * @param options the custom options for this query.
- * @return the {@link QueryResult} once the response arrives successfully.
- * @throws TimeoutException if the operation times out before getting a result.
- * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
- */
- public QueryResult query(final String statement, final QueryOptions options) {
- return block(async().query(statement, options));
- }
-
- /**
- * Performs an analytics query with default {@link AnalyticsOptions}.
- *
- * @param statement the query statement as a raw string.
- * @return the {@link AnalyticsResult} once the response arrives successfully.
- * @throws TimeoutException if the operation times out before getting a result.
- * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
- */
- public AnalyticsResult analyticsQuery(final String statement) {
- return analyticsQuery(statement, DEFAULT_ANALYTICS_OPTIONS);
- }
-
- /**
- * Performs an analytics query with custom {@link AnalyticsOptions}.
- *
- * @param statement the query statement as a raw string.
- * @param options the custom options for this query.
- * @return the {@link AnalyticsResult} once the response arrives successfully.
- * @throws TimeoutException if the operation times out before getting a result.
- * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
- */
- public AnalyticsResult analyticsQuery(final String statement, final AnalyticsOptions options) {
- return block(async().analyticsQuery(statement, options));
- }
-
- /**
- * Performs a Full Text Search (FTS) query with default {@link SearchOptions}.
- *
- * @param query the query, in the form of a {@link SearchQuery}
- * @return the {@link SearchRequest} once the response arrives successfully.
- * @throws TimeoutException if the operation times out before getting a result.
- * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
- */
- public SearchResult searchQuery(final String indexName, final SearchQuery query) {
- return searchQuery(indexName, query, DEFAULT_SEARCH_OPTIONS);
- }
-
- /**
- * Performs a Full Text Search (FTS) query with custom {@link SearchOptions}.
- *
- * @param query the query, in the form of a {@link SearchQuery}
- * @param options the custom options for this query.
- * @return the {@link SearchRequest} once the response arrives successfully.
- * @throws TimeoutException if the operation times out before getting a result.
- * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
- */
- public SearchResult searchQuery(final String indexName, final SearchQuery query, final SearchOptions options) {
- return block(asyncCluster.searchQuery(indexName, query, options));
- }
-
- /**
- * Opens a {@link Bucket} with the given name.
- *
- * @param bucketName the name of the bucket to open.
- * @return a {@link Bucket} once opened.
- */
- public Bucket bucket(final String bucketName) {
- return bucketCache.computeIfAbsent(bucketName, n -> new Bucket(asyncCluster.bucket(n)));
- }
-
- /**
- * Performs a non-reversible disconnect of this {@link Cluster}.
- *
- * If this method is used, the default disconnect timeout on the environment is used. Please use the companion
- * overload ({@link #disconnect(Duration)} if you want to provide a custom duration.
- *
- * If a custom {@link ClusterEnvironment} has been passed in during connect, it is VERY important to
- * shut it down after calling this method. This will prevent any in-flight tasks to be stopped prematurely.
- */
- public void disconnect() {
- block(asyncCluster.disconnect());
- }
-
- /**
- * Performs a non-reversible disconnect of this {@link Cluster}.
- *
- * If a custom {@link ClusterEnvironment} has been passed in during connect, it is VERY important to
- * shut it down after calling this method. This will prevent any in-flight tasks to be stopped prematurely.
- *
- * @param timeout allows to override the default disconnect duration.
- */
- public void disconnect(final Duration timeout) {
- block(asyncCluster.disconnect(timeout));
- }
-
- /**
- * Runs a diagnostic report on the current state of the cluster from the SDKs point of view.
- *
- * Please note that it does not perform any I/O to do this, it will only use the current known state of the cluster
- * to assemble the report (so, if for example no N1QL query has been run the socket pool might be empty and as
- * result not show up in the report).
- *
- * @return the {@link DiagnosticsResult} once complete.
- */
- public DiagnosticsResult diagnostics() {
- return block(asyncCluster.diagnostics(DEFAULT_DIAGNOSTICS_OPTIONS));
- }
-
- /**
- * Runs a diagnostic report with custom options on the current state of the cluster from the SDKs point of view.
- *
- * Please note that it does not perform any I/O to do this, it will only use the current known state of the cluster
- * to assemble the report (so, if for example no N1QL query has been run the socket pool might be empty and as
- * result not show up in the report).
- *
- * @param options options that allow to customize the report.
- * @return the {@link DiagnosticsResult} once complete.
- */
- public DiagnosticsResult diagnostics(final DiagnosticsOptions options) {
- return block(asyncCluster.diagnostics(options));
- }
-
- /**
- * Performs application-level ping requests against services in the couchbase cluster.
- *
- * Note that this operation performs active I/O against services and endpoints to assess their health. If you do
- * not wish to perform I/O, consider using the {@link #diagnostics()} instead. You can also combine the functionality
- * of both APIs as needed, which is {@link #waitUntilReady(Duration)} is doing in its implementation as well.
- *
- * @return the {@link PingResult} once complete.
- */
- public PingResult ping() {
- return block(asyncCluster.ping());
- }
-
- /**
- * Performs application-level ping requests with custom options against services in the couchbase cluster.
- *
- * Note that this operation performs active I/O against services and endpoints to assess their health. If you do
- * not wish to perform I/O, consider using the {@link #diagnostics(DiagnosticsOptions)} instead. You can also combine
- * the functionality of both APIs as needed, which is {@link #waitUntilReady(Duration)} is doing in its
- * implementation as well.
- *
- * @return the {@link PingResult} once complete.
- */
- public PingResult ping(final PingOptions options) {
- return block(asyncCluster.ping(options));
- }
-
- /**
- * Waits until the desired {@link ClusterState} is reached.
- *
- * This method will wait until either the cluster state is "online", or the timeout is reached. Since the SDK is
- * bootstrapping lazily, this method allows to eagerly check during bootstrap if all of the services are online
- * and usable before moving on.
- *
- * @param timeout the maximum time to wait until readiness.
- */
- public void waitUntilReady(final Duration timeout) {
- block(asyncCluster.waitUntilReady(timeout));
- }
-
- /**
- * Waits until the desired {@link ClusterState} is reached.
- *
- * This method will wait until either the cluster state is "online" by default, or the timeout is reached. Since the
- * SDK is bootstrapping lazily, this method allows to eagerly check during bootstrap if all of the services are online
- * and usable before moving on. You can tune the properties through {@link WaitUntilReadyOptions}.
- *
- * @param timeout the maximum time to wait until readiness.
- * @param options the options to customize the readiness waiting.
- */
- public void waitUntilReady(final Duration timeout, final WaitUntilReadyOptions options) {
- block(asyncCluster.waitUntilReady(timeout, options));
- }
-
- /**
- * Allows access to transactions.
- *
- * @return the {@link Transactions} interface.
- */
- @Stability.Uncommitted
- public Transactions transactions() {
- return new Transactions(core(), environment().jsonSerializer());
- }
-}
-
diff --git a/src/main/java/com/couchbase/client/java/ClusterInterface.java b/src/main/java/com/couchbase/client/java/ClusterInterface.java
deleted file mode 100644
index 872a6efdf..000000000
--- a/src/main/java/com/couchbase/client/java/ClusterInterface.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright (c) 2018 Couchbase, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.couchbase.client.java;
-
-import com.couchbase.client.core.Core;
-import com.couchbase.client.core.annotation.Stability;
-import com.couchbase.client.core.diagnostics.DiagnosticsResult;
-import com.couchbase.client.core.diagnostics.PingResult;
-import com.couchbase.client.core.env.Authenticator;
-import com.couchbase.client.core.env.PasswordAuthenticator;
-import com.couchbase.client.core.env.SeedNode;
-import com.couchbase.client.java.analytics.AnalyticsOptions;
-//import com.couchbase.client.java.analytics.AnalyticsResult;
-import com.couchbase.client.java.diagnostics.DiagnosticsOptions;
-import com.couchbase.client.java.diagnostics.PingOptions;
-import com.couchbase.client.java.diagnostics.WaitUntilReadyOptions;
-import com.couchbase.client.java.env.ClusterEnvironment;
-import com.couchbase.client.java.manager.analytics.AnalyticsIndexManager;
-import com.couchbase.client.java.manager.bucket.BucketManager;
-import com.couchbase.client.java.manager.eventing.EventingFunctionManager;
-import com.couchbase.client.java.manager.query.QueryIndexManager;
-import com.couchbase.client.java.manager.search.SearchIndexManager;
-import com.couchbase.client.java.manager.user.UserManager;
-import com.couchbase.client.java.query.QueryOptions;
-import com.couchbase.client.java.query.QueryResult;
-import com.couchbase.client.java.search.SearchOptions;
-import com.couchbase.client.java.search.SearchQuery;
-import com.couchbase.client.java.search.result.SearchResult;
-import com.couchbase.client.java.transactions.Transactions;
-import org.springframework.data.couchbase.transaction.CouchbaseTransactionalOperator;
-
-import java.time.Duration;
-import java.util.Set;
-import java.util.function.Supplier;
-
-import static com.couchbase.client.core.util.Validators.notNull;
-import static com.couchbase.client.core.util.Validators.notNullOrEmpty;
-import static com.couchbase.client.java.AsyncCluster.extractClusterEnvironment;
-import static com.couchbase.client.java.AsyncCluster.seedNodesFromConnectionString;
-import static com.couchbase.client.java.ClusterOptions.clusterOptions;
-
-public interface ClusterInterface {
-
- AsyncCluster async();
-
- ReactiveCluster reactive();
-
- @Stability.Volatile
- Core core();
-
- UserManager users();
-
- BucketManager buckets();
-
- AnalyticsIndexManager analyticsIndexes();
-
- QueryIndexManager queryIndexes();
-
- SearchIndexManager searchIndexes();
-
- @Stability.Uncommitted
- EventingFunctionManager eventingFunctions();
-
- ClusterEnvironment environment();
-
- QueryResult query(String statement);
-
- QueryResult query(String statement, QueryOptions options);
-
- //AnalyticsResult analyticsQuery(String statement);
-
- // AnalyticsResult analyticsQuery(String statement, AnalyticsOptions options);
-
- SearchResult searchQuery(String indexName, SearchQuery query);
-
- SearchResult searchQuery(String indexName, SearchQuery query, SearchOptions options);
-
- Bucket bucket(String bucketName);
-
- void disconnect();
-
- void disconnect(Duration timeout);
-
- DiagnosticsResult diagnostics();
-
- DiagnosticsResult diagnostics(DiagnosticsOptions options);
-
- PingResult ping();
-
- PingResult ping(PingOptions options);
-
- void waitUntilReady(Duration timeout);
-
- void waitUntilReady(Duration timeout, WaitUntilReadyOptions options);
-
- Transactions transactions();
-}
diff --git a/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java b/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java
index 7c631eeca..cee247ad3 100644
--- a/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java
+++ b/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java
@@ -16,6 +16,7 @@
*/
package com.couchbase.client.java.transactions;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
@@ -26,6 +27,10 @@
import java.util.logging.Logger;
import com.couchbase.client.core.annotation.Stability;
+import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ObjectNode;
+import com.couchbase.client.core.error.EncodingFailureException;
+import com.couchbase.client.core.json.Mapper;
+import com.couchbase.client.core.msg.query.QueryRequest;
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
import com.couchbase.client.core.transaction.CoreTransactionContext;
import com.couchbase.client.core.transaction.CoreTransactionsReactive;
@@ -33,7 +38,9 @@
import com.couchbase.client.core.transaction.config.CoreTransactionOptions;
import com.couchbase.client.core.transaction.log.CoreTransactionLogger;
import com.couchbase.client.core.transaction.support.AttemptState;
+import com.couchbase.client.java.ReactiveScope;
import com.couchbase.client.java.codec.JsonSerializer;
+import com.couchbase.client.java.json.JsonObject;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
@@ -131,4 +138,24 @@ public static TransactionResult run(Transactions transactions, Consumer T createProxyInstance(ReactiveCouchbaseResourceHolder session, T target, Class targetType) {
-
- ProxyFactory factory = new ProxyFactory();
- factory.setTarget(target);
- factory.setInterfaces(targetType);
- factory.setOpaque(true);
-
- factory.addAdvice(new SessionAwareMethodInterceptor<>(session, target, ReactiveCouchbaseResourceHolder.class,
- ClusterInterface.class, this::proxyDatabase, Collection.class, this::proxyCollection));
-
- return targetType.cast(factory.getProxy(target.getClass().getClassLoader()));
- }
-
- private Collection proxyCollection(ReactiveCouchbaseResourceHolder session, Collection c) {
- return createProxyInstance(session, c, Collection.class);
- }
-
- private ClusterInterface proxyDatabase(ReactiveCouchbaseResourceHolder session, ClusterInterface cluster) {
- return createProxyInstance(session, cluster, ClusterInterface.class);
- }
-
- /**
- * {@link CoreTransactionAttemptContext} bound TODO decorating the database with a
- * {@link SessionAwareMethodInterceptor}.
- *
- * @author Christoph Strobl
- * @since 2.1
- */
- static final class CoreTransactionAttemptContextBoundCouchbaseClientFactory
- implements ReactiveCouchbaseClientFactory {
-
- private final ReactiveCouchbaseResourceHolder transactionResources;
- private final ReactiveCouchbaseClientFactory delegate;
-
- CoreTransactionAttemptContextBoundCouchbaseClientFactory(ReactiveCouchbaseResourceHolder transactionResources,
- ReactiveCouchbaseClientFactory delegate, Transactions transactions) {
- this.transactionResources = transactionResources;
- this.delegate = delegate;
- }
-
- @Override
- public ClusterInterface getCluster() throws DataAccessException {
- return decorateDatabase(delegate.getCluster());
- }
-
- @Override
- public Mono getCollectionMono(String name) {
- return Mono.just(delegate.getCollection(name));
- }
-
- @Override
- public Collection getCollection(String collectionName) {
- return delegate.getCollection(collectionName);
- }
-
- @Override
- public Scope getScope(String scopeName) {
- return delegate.getScope(scopeName);
- }
-
- public Scope getScope() {
- return delegate.getScope();
- }
-
- @Override
- public ReactiveCouchbaseClientFactory withScope(String scopeName) {
- return delegate.withScope(scopeName);
- }
-
- /*
- * (non-Javadoc)
- * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getExceptionTranslator()
- */
- @Override
- public PersistenceExceptionTranslator getExceptionTranslator() {
- return delegate.getExceptionTranslator();
- }
-
- @Override
- public String getBucketName() {
- return delegate.getBucketName();
- }
-
- @Override
- public String getScopeName() {
- return delegate.getScopeName();
- }
-
- @Override
- public void close() throws IOException {
- delegate.close();
- }
-
- /*
- * (non-Javadoc)
- * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getSession(com.mongodb.CoreTransactionAttemptContextOptions)
- */
-
- @Override
- public Mono getResourceHolderMono() {
- return Mono.just(transactionResources);
- }
-
- @Override
- public ReactiveCouchbaseResourceHolder getResourceHolder(TransactionOptions options,
- CoreTransactionAttemptContext atr) {
- ReactiveCouchbaseResourceHolder holder = delegate.getResourceHolder(options, atr);
- return holder;
- }
-
- /*
- * (non-Javadoc)
- * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#withSession(com.mongodb.session.CoreTransactionAttemptContext)
- */
- @Override
- public ReactiveCouchbaseClientFactory withCore(ReactiveCouchbaseResourceHolder core) {
- return delegate.withCore(core);
- }
-
- @Override
- public CouchbaseTransactionalOperator getTransactionalOperator() {
- return delegate.getTransactionalOperator();
- }
-
- @Override
- public ReactiveCouchbaseClientFactory with(CouchbaseTransactionalOperator txOp) {
- return delegate.with(txOp);
- }
-
- private ClusterInterface decorateDatabase(ClusterInterface database) {
- return createProxyInstance(transactionResources, database, ClusterInterface.class);
- }
-
- private ClusterInterface proxyDatabase(ReactiveCouchbaseResourceHolder session, ClusterInterface database) {
- return createProxyInstance(session, database, ClusterInterface.class);
- }
-
- private Collection proxyCollection(ReactiveCouchbaseResourceHolder session, Collection collection) {
- return createProxyInstance(session, collection, Collection.class);
- }
-
- private T createProxyInstance(ReactiveCouchbaseResourceHolder session, T target, Class targetType) {
-
- ProxyFactory factory = new ProxyFactory();
- factory.setTarget(target);
- factory.setInterfaces(targetType);
- factory.setOpaque(true);
-
- factory.addAdvice(new SessionAwareMethodInterceptor<>(session, target, ReactiveCouchbaseResourceHolder.class,
- ClusterInterface.class, this::proxyDatabase, Collection.class, this::proxyCollection));
-
- return targetType.cast(factory.getProxy(target.getClass().getClassLoader()));
- }
-
- public ReactiveCouchbaseClientFactory getDelegate() {
- return this.delegate;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- CoreTransactionAttemptContextBoundCouchbaseClientFactory that = (CoreTransactionAttemptContextBoundCouchbaseClientFactory) o;
-
- if (!ObjectUtils.nullSafeEquals(this.transactionResources, that.transactionResources)) {
- return false;
- }
- return ObjectUtils.nullSafeEquals(this.delegate, that.delegate);
- }
-
- @Override
- public int hashCode() {
- int result = ObjectUtils.nullSafeHashCode(this.transactionResources);
- result = 31 * result + ObjectUtils.nullSafeHashCode(this.delegate);
- return result;
- }
-
- public String toString() {
- return "SimpleReactiveCouchbaseDatabaseFactory.CoreTransactionAttemptContextBoundCouchDbFactory(session="
- + this.getResourceHolderMono() + ", delegate=" + this.getDelegate() + ")";
- }
- }
}
diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java
index c9750db4a..3223786aa 100644
--- a/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java
+++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java
@@ -37,8 +37,6 @@
import org.springframework.data.couchbase.core.query.Query;
import org.springframework.data.couchbase.core.support.PseudoArgs;
import org.springframework.data.couchbase.transaction.CouchbaseTransactionalOperator;
-import org.springframework.data.couchbase.transaction.ReactiveCouchbaseClientUtils;
-import org.springframework.data.couchbase.transaction.SessionSynchronization;
import org.springframework.data.mapping.context.MappingContextEvent;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
@@ -253,9 +251,8 @@ public QueryScanConsistency getConsistency() {
}
- protected Mono doGetTemplate() {
- return ReactiveCouchbaseClientUtils.getTemplate(clientFactory, SessionSynchronization.ON_ACTUAL_TRANSACTION,
- this.getConverter());
+ protected ReactiveCouchbaseTemplate doGetTemplate() {
+ return new ReactiveCouchbaseTemplate(clientFactory, converter);
}
class IndexCreatorEventListener implements ApplicationListener> {
diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveExistsByIdOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveExistsByIdOperationSupport.java
index 94e2dddb5..18633248b 100644
--- a/src/main/java/org/springframework/data/couchbase/core/ReactiveExistsByIdOperationSupport.java
+++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveExistsByIdOperationSupport.java
@@ -74,7 +74,7 @@ public Mono one(final String id) {
PseudoArgs pArgs = new PseudoArgs<>(template, scope, collection, options, null, domainType);
LOG.trace("existsById {}", pArgs);
- return TransactionalSupport.verifyNotInTransaction(template.doGetTemplate(), "existsById")
+ return TransactionalSupport.verifyNotInTransaction("existsById")
.then(Mono.just(id))
.flatMap(docId -> template.getCouchbaseClientFactory().withScope(pArgs.getScope())
.getCollection(pArgs.getCollection()).reactive().exists(id, buildOptions(pArgs.getOptions()))
diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByAnalyticsOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByAnalyticsOperationSupport.java
index 8fa592eca..d2935c8fe 100644
--- a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByAnalyticsOperationSupport.java
+++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByAnalyticsOperationSupport.java
@@ -109,7 +109,7 @@ public Mono first() {
public Flux all() {
return Flux.defer(() -> {
String statement = assembleEntityQuery(false);
- return TransactionalSupport.verifyNotInTransaction(template.doGetTemplate(), "findByAnalytics")
+ return TransactionalSupport.verifyNotInTransaction("findByAnalytics")
.then(template.getCouchbaseClientFactory().getCluster().reactive()
.analyticsQuery(statement, buildAnalyticsOptions())).onErrorMap(throwable -> {
if (throwable instanceof RuntimeException) {
diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperationSupport.java
index b20f2957b..8003a293f 100644
--- a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperationSupport.java
+++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperationSupport.java
@@ -92,11 +92,8 @@ public Mono one(final String id) {
ReactiveCollection rc = template.getCouchbaseClientFactory().withScope(pArgs.getScope())
.getCollection(pArgs.getCollection()).reactive();
- // this will get me a template with a session holding tx
- Mono tmpl = template.doGetTemplate();
-
- Mono reactiveEntity = tmpl.flatMap(tp -> tp.getCouchbaseClientFactory().getResourceHolderMono().flatMap(s -> {
- if (s == null || s.getCore() == null) {
+ Mono reactiveEntity = TransactionalSupport.checkForTransactionInThreadLocalStorage(txCtx).flatMap(ctxOpt -> {
+ if (!ctxOpt.isPresent()) {
if (pArgs.getOptions() instanceof GetAndTouchOptions) {
return rc.getAndTouch(id, expiryToUse(), (GetAndTouchOptions) pArgs.getOptions())
.flatMap(result -> support.decodeEntity(id, result.contentAs(String.class), result.cas(), domainType,
@@ -107,12 +104,12 @@ public Mono one(final String id) {
pArgs.getScope(), pArgs.getCollection(), null));
}
} else {
- return s.getCore().get(makeCollectionIdentifier(rc.async()), id)
+ return ctxOpt.get().getCore().get(makeCollectionIdentifier(rc.async()), id)
.flatMap(result -> support.decodeEntity(id, new String(result.contentAsBytes(), StandardCharsets.UTF_8),
result.cas(), domainType, pArgs.getScope(), pArgs.getCollection(),
new TransactionResultHolder(result), null));
}
- }));
+ });
return reactiveEntity.onErrorResume(throwable -> {
if (throwable instanceof DocumentNotFoundException) {
diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java
index b717b589a..8b117b003 100644
--- a/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java
+++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java
@@ -191,20 +191,18 @@ public Flux all() {
ReactiveCouchbaseClientFactory clientFactory = template.getCouchbaseClientFactory();
ReactiveScope rs = clientFactory.getScope(pArgs.getScope()).reactive();
- Mono tmpl = template.doGetTemplate();
- Mono