From 443d6f49f249568ee75d45e34346d09442deee83 Mon Sep 17 00:00:00 2001 From: nicktorwald Date: Tue, 26 Mar 2019 17:58:45 +0700 Subject: [PATCH 1/2] Add cluster feature docs --- README.md | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/README.md b/README.md index 9600bfb2..07943436 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ To get the Java connector for Tarantool 1.6.9, visit ## Table of contents * [Getting started](#getting-started) +* [Cluster support](#cluster-support) * [Where to get help](#where-to-get-help) ## Getting started @@ -156,6 +157,102 @@ System.out.println(template.query("select * from hello_world where hello=:id", C For more implementation details, see [API documentation](http://tarantool.github.io/tarantool-java/apidocs/index.html). +## Cluster support + +To be more fault-tolerant the connector provides cluster extensions. In +particular `TarantoolClusterClient` and built-in `RoundRobinSocketProviderImpl` +used as a default `SocketProvider` implementation. When currently connected +instance is down then the client will try to reconnect to the first available +instance using strategy defined in a socket provider. You need to supply +a list of nodes which will be used by the cluster client to provide such +ability. Also you can prefer to use a [discovery mechanism](#auto-discovery) +in order to dynamically fetch and apply the node list. + +### Basic cluster client usage + +1. Configure `TarantoolClusterClientConfig`: + +```java +TarantoolClusterClientConfig config = new TarantoolClusterClientConfig(); +// fill other settings +config.operationExpiryTimeMillis = 2000; +config.executor = Executors.newSingleThreadExecutor(); +``` + +2. Create an instance of `TarantoolClusterClientImpl`. You need to provide +an initial list of nodes: + +```java +String[] nodes = new String[] { "myHost1:3301", "myHost2:3302", "myHost3:3301" }; +TarantoolClusterClient client = new TarantoolClusterClient(config, nodes); +``` + +3. Work with the client using same API as defined in `TarantoolClient`: + +```java +client.syncOps().insert(23, Arrays.asList(1, 1)); +``` + +### Auto-discovery + +Auto-discovery feature allows a cluster client to fetch addresses of +cluster nodes to reflect changes related to the cluster topology. To achieve +this you have to create a Lua function on the server side which returns +a single array result. Client periodically pools the server to obtain a +fresh list and apply it if its content changes. + +1. On the server side create a function which returns nodes: + +```bash +tarantool> function get_cluster_nodes() return { 'host1:3301', 'host2:3302', 'host3:3301' } end +``` + +You need to pay attention to a function contract we are currently supporting: +* The client never passes any arguments to a discovery function. +* A discovery function _should_ return a single result of strings (i.e. single + string `return 'host:3301'` or array of strings `return {'host1:3301', 'host2:3301'}`). +* A discovery function _may_ return multi-results but the client takes + into account only first of them (i.e. `return {'host:3301'}, discovery_delay`, where + the second result is unused). Even more, any extra results __are reserved__ by the client + in order to extend its contract with a backward compatibility. +* A discovery function _should NOT_ return no results, empty result, wrong type result, + and Lua errors. The client discards such kinds of results but it does not affect the discovery + process for next scheduled tasks. + +2. On the client side configure discovery settings in `TarantoolClusterClientConfig`: + +```java +TarantoolClusterClientConfig config = new TarantoolClusterClientConfig(); +// fill other settings +config.clusterDiscoveryEntryFunction = "get_cluster_nodes"; // discovery function used to fetch nodes +config.clusterDiscoveryDelayMillis = 60_000; // how often client polls the discovery server +``` + +3. Create a client using the config made above. + +```java +TarantoolClusterClient client = new TarantoolClusterClient(config); +client.syncOps().insert(45, Arrays.asList(1, 1)); +``` + +### Auto-discovery caveats + +* You need to set _not empty_ value to `clusterDiscoveryEntryFunction` to enable auto-discovery feature. +* There are only two cases when a discovery task runs: just after initialization of the cluster + client and a periodical scheduler timeout defined in `TarantoolClusterClientConfig.clusterDiscoveryDelayMillis`. +* A discovery task always uses an active client connection to get the nodes list. + It's in your responsibility to provide a function availability as well as a consistent + nodes list on all instances you initially set or obtain from the task. +* If some error occurs while a discovery task is running then this task + will be aborted without any after-effects for next task executions. These cases, for instance, are + a wrong function result (see discovery function contract) or a broken connection. + There is an exception if the client is closed then discovery process will stop permanently. +* It's possible to obtain a list which does NOT contain the node we are currently + connected to. It leads the client to try to reconnect to another node from the + new list. It may take some time to graceful disconnect from the current node. + The client does its best to catch the moment when there are no pending responses + and perform a reconnection. + ## Where to get help Got problems or questions? Post them on @@ -164,6 +261,7 @@ Got problems or questions? Post them on base for possible answers and solutions. ## Building + To run tests ``` ./mvnw clean test From 7d00b2acf5381d2468fbe2971dc1490df8b4459a Mon Sep 17 00:00:00 2001 From: nicktorwald Date: Tue, 12 Mar 2019 22:22:31 +0300 Subject: [PATCH 2/2] Support auto refresh a list of cluster nodes Refactor SocketChannelProvider implementations. Now we have two SingleSocketChannelProviderImpl and RoundRobinSocketProviderImpl used by simple and cluster clients respectively. To achieve this a BaseSocketChannelProvider was extracted. Add a service discovery implementation based on a Tarantool stored procedure which is called to obtain a new list of cluster nodes. Integrate service discovery and current cluster client. The client now is aware of potential nodes changing using a configurable background job which periodically checks whether node addresses have changed. If so the client refreshes the RoundRobinSocketProviderImpl and replaces old nodes by new ones. It also requires some additional effort in case of missing the current node in the list. We need to reconnect to another node as soon as possible with a minimal delay between client requests. To achieve this we currently try to catch a moment when the requests in progress have been accomplished and we can finish reconnection process. Closes: #34 See also: #142 --- .../tarantool/BaseSocketChannelProvider.java | 169 +++++++ .../tarantool/RefreshableSocketProvider.java | 12 + .../RoundRobinSocketProviderImpl.java | 251 ++++------ .../SingleSocketChannelProviderImpl.java | 43 ++ .../org/tarantool/SocketChannelProvider.java | 4 + .../org/tarantool/TarantoolClientConfig.java | 26 +- .../org/tarantool/TarantoolClientImpl.java | 47 +- .../org/tarantool/TarantoolClusterClient.java | 213 +++++++-- .../TarantoolClusterClientConfig.java | 29 +- .../TarantoolThreadDaemonFactory.java | 23 + .../IllegalDiscoveryFunctionResult.java | 13 + .../cluster/TarantoolClusterDiscoverer.java | 21 + ...antoolClusterStoredFunctionDiscoverer.java | 63 +++ .../java/org/tarantool/util/StringUtils.java | 21 + .../tarantool/AbstractSocketProviderTest.java | 50 ++ .../AbstractTarantoolConnectorIT.java | 10 +- .../tarantool/ClientReconnectClusterIT.java | 450 ++++++++++++++++-- .../RoundRobinSocketProviderImplTest.java | 155 ++++++ .../SingleSocketChannelProviderImplTest.java | 69 +++ src/test/java/org/tarantool/TestUtils.java | 20 + ...sterServiceStoredFunctionDiscovererIT.java | 196 ++++++++ 21 files changed, 1623 insertions(+), 262 deletions(-) create mode 100644 src/main/java/org/tarantool/BaseSocketChannelProvider.java create mode 100644 src/main/java/org/tarantool/RefreshableSocketProvider.java create mode 100644 src/main/java/org/tarantool/SingleSocketChannelProviderImpl.java create mode 100644 src/main/java/org/tarantool/TarantoolThreadDaemonFactory.java create mode 100644 src/main/java/org/tarantool/cluster/IllegalDiscoveryFunctionResult.java create mode 100644 src/main/java/org/tarantool/cluster/TarantoolClusterDiscoverer.java create mode 100644 src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java create mode 100644 src/main/java/org/tarantool/util/StringUtils.java create mode 100644 src/test/java/org/tarantool/AbstractSocketProviderTest.java create mode 100644 src/test/java/org/tarantool/RoundRobinSocketProviderImplTest.java create mode 100644 src/test/java/org/tarantool/SingleSocketChannelProviderImplTest.java create mode 100644 src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java diff --git a/src/main/java/org/tarantool/BaseSocketChannelProvider.java b/src/main/java/org/tarantool/BaseSocketChannelProvider.java new file mode 100644 index 00000000..a532bcdf --- /dev/null +++ b/src/main/java/org/tarantool/BaseSocketChannelProvider.java @@ -0,0 +1,169 @@ +package org.tarantool; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; + +public abstract class BaseSocketChannelProvider implements SocketChannelProvider { + + /** + * Limit of retries. + */ + private int retriesLimit = RETRY_NO_LIMIT; + + /** + * Timeout to establish socket connection with an individual server. + */ + private int timeout = NO_TIMEOUT; + + /** + * Tries to establish a new connection to the Tarantool instances. + * + * @param retryNumber number of current retry. Reset after successful connect. + * @param lastError the last error occurs when reconnecting + * + * @return connected socket channel + * + * @throws CommunicationException if any I/O errors happen or there are + * no addresses available + */ + @Override + public final SocketChannel get(int retryNumber, Throwable lastError) { + if (areRetriesExhausted(retryNumber)) { + throw new CommunicationException("Connection retries exceeded.", lastError); + } + + long deadline = System.currentTimeMillis() + timeout; + while (!Thread.currentThread().isInterrupted()) { + try { + InetSocketAddress address = getAddress(retryNumber, lastError); + return openChannel(address); + } catch (IOException e) { + checkTimeout(deadline, e); + } + } + throw new CommunicationException("Thread interrupted.", new InterruptedException()); + } + + private void checkTimeout(long deadline, Exception e) { + long timeLeft = deadline - System.currentTimeMillis(); + if (timeLeft <= 0) { + throw new CommunicationException("Connection time out.", e); + } + try { + Thread.sleep(timeLeft / 10); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + + /** + * Gets address to be used to establish a new connection + * Address can be null. + * + * @param retryNumber reconnection attempt number + * @param lastError reconnection reason + * + * @return available address which is depended on implementation + * + * @throws IOException if any I/O errors occur + */ + protected abstract InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException; + + /** + * Sets maximum amount of reconnect attempts to be made before an exception is raised. + * The retry count is maintained by a {@link #get(int, Throwable)} caller + * when a socket level connection was established. + *

+ * Negative value means unlimited attempts. + * + * @param retriesLimit Limit of retries to use. + */ + public void setRetriesLimit(int retriesLimit) { + this.retriesLimit = retriesLimit; + } + + /** + * Gets limit of attempts to establish connection. + * + * @return Maximum reconnect attempts to make before raising exception. + */ + public int getRetriesLimit() { + return retriesLimit; + } + + /** + * Parse a string address in the form of host[:port] + * and builds a socket address. + * + * @param address Server address. + * + * @return Socket address. + */ + protected InetSocketAddress parseAddress(String address) { + int separatorPosition = address.indexOf(':'); + String host = (separatorPosition < 0) ? address : address.substring(0, separatorPosition); + int port = (separatorPosition < 0) ? 3301 : Integer.parseInt(address.substring(separatorPosition + 1)); + return new InetSocketAddress(host, port); + } + + protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOException { + SocketChannel channel = null; + try { + channel = SocketChannel.open(); + channel.socket().connect(socketAddress, timeout); + return channel; + } catch (IOException e) { + if (channel != null) { + try { + channel.close(); + } catch (IOException ignored) { + // No-op. + } + } + throw e; + } + } + + /** + * Sets maximum amount of time to wait for a socket connection establishment + * with an individual server. + *

+ * Zero means infinite timeout. + * + * @param timeout timeout value, ms. + * + * @throws IllegalArgumentException if timeout is negative. + */ + public void setTimeout(int timeout) { + if (timeout < 0) { + throw new IllegalArgumentException("timeout is negative."); + } + this.timeout = timeout; + } + + /** + * Gest maximum amount of time to wait for a socket + * connection establishment with an individual server. + * + * @return timeout + */ + public int getTimeout() { + return timeout; + } + + /** + * Provides a decision on whether retries limit is hit. + * + * @param retries Current count of retries. + * + * @return {@code true} if retries are exhausted. + */ + private boolean areRetriesExhausted(int retries) { + int limit = getRetriesLimit(); + if (limit < 0) { + return false; + } + return retries >= limit; + } +} diff --git a/src/main/java/org/tarantool/RefreshableSocketProvider.java b/src/main/java/org/tarantool/RefreshableSocketProvider.java new file mode 100644 index 00000000..a3ccc56f --- /dev/null +++ b/src/main/java/org/tarantool/RefreshableSocketProvider.java @@ -0,0 +1,12 @@ +package org.tarantool; + +import java.net.SocketAddress; +import java.util.Collection; + +public interface RefreshableSocketProvider { + + Collection getAddresses(); + + void refreshAddresses(Collection addresses); + +} diff --git a/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java b/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java index 4cb9c5f0..24820a03 100644 --- a/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java +++ b/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java @@ -2,163 +2,123 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; +import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** * Basic reconnection strategy that changes addresses in a round-robin fashion. * To be used with {@link TarantoolClientImpl}. */ -public class RoundRobinSocketProviderImpl implements SocketChannelProvider { - /** - * Timeout to establish socket connection with an individual server. - * 0 is infinite. - */ - private int timeout; - - /** - * Limit of retries (-1 = no limit). - */ - private int retriesLimit = -1; - - /** - * Server addresses as configured. - */ - private final String[] addrs; +public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider implements RefreshableSocketProvider { - /** - * Socket addresses. - */ - private final InetSocketAddress[] sockAddrs; + private static final int UNSET_POSITION = -1; /** - * Current position within {@link #sockAddrs} array. + * Socket addresses pool. */ - private int pos; + private final List socketAddresses = new ArrayList<>(); /** - * Constructs an instance. + * Current position within {@link #socketAddresses} list. + *

+ * It is {@link #UNSET_POSITION} when no addresses from + * the {@link #socketAddresses} pool have been processed yet. + *

+ * When this provider receives new addresses it tries + * to look for a new position for the last used address or + * sets the position to {@link #UNSET_POSITION} otherwise. * - * @param addrs Array of addresses in a form of [host]:[port]. + * @see #getLastObtainedAddress() + * @see #refreshAddresses(Collection) */ - public RoundRobinSocketProviderImpl(String... addrs) { - if (addrs == null || addrs.length == 0) { - throw new IllegalArgumentException("addrs is null or empty."); - } - - this.addrs = Arrays.copyOf(addrs, addrs.length); - - sockAddrs = new InetSocketAddress[this.addrs.length]; - - for (int i = 0; i < this.addrs.length; i++) { - sockAddrs[i] = parseAddress(this.addrs[i]); - } - } + private AtomicInteger currentPosition = new AtomicInteger(UNSET_POSITION); /** - * Gets raw addresses list. + * Address list lock for a thread-safe access to it + * when a refresh operation occurs. * - * @return Configured addresses in a form of [host]:[port]. + * @see RefreshableSocketProvider#refreshAddresses(Collection) */ - public String[] getAddresses() { - return this.addrs; - } + private ReadWriteLock addressListLock = new ReentrantReadWriteLock(); /** - * Sets maximum amount of time to wait for a socket connection establishment - * with an individual server. + * Constructs an instance. * - * Zero means infinite timeout. + * @param addresses optional array of addresses in a form of host[:port] * - * @param timeout Timeout value, ms. - * @return {@code this}. - * @throws IllegalArgumentException If timeout is negative. + * @throws IllegalArgumentException if addresses aren't provided */ - public RoundRobinSocketProviderImpl setTimeout(int timeout) { - if (timeout < 0) { - throw new IllegalArgumentException("timeout is negative."); - } - - this.timeout = timeout; - - return this; + public RoundRobinSocketProviderImpl(String... addresses) { + updateAddressList(Arrays.asList(addresses)); } - /** - * Gets maximum amount of time to wait for a socket connection establishment - * with an individual server. - * - * @return timeout - */ - public int getTimeout() { - return timeout; + private void updateAddressList(Collection addresses) { + if (addresses == null || addresses.isEmpty()) { + throw new IllegalArgumentException("At least one address must be provided"); + } + Lock writeLock = addressListLock.writeLock(); + writeLock.lock(); + try { + InetSocketAddress lastAddress = getLastObtainedAddress(); + socketAddresses.clear(); + addresses.stream() + .map(this::parseAddress) + .collect(Collectors.toCollection(() -> socketAddresses)); + if (lastAddress != null) { + int recoveredPosition = socketAddresses.indexOf(lastAddress); + currentPosition.set(recoveredPosition); + } else { + currentPosition.set(UNSET_POSITION); + } + } finally { + writeLock.unlock(); + } } /** - * Sets maximum amount of reconnect attempts to be made before an exception is raised. - * The retry count is maintained by a {@link #get(int, Throwable)} caller - * when a socket level connection was established. - *

- * Negative value means unlimited. + * Gets parsed and resolved internet addresses. * - * @param retriesLimit Limit of retries to use. - * @return {@code this}. - */ - public RoundRobinSocketProviderImpl setRetriesLimit(int retriesLimit) { - this.retriesLimit = retriesLimit; - return this; + * @return socket addresses + */ + public List getAddresses() { + Lock readLock = addressListLock.readLock(); + readLock.lock(); + try { + return Collections.unmodifiableList(this.socketAddresses); + } finally { + readLock.unlock(); + } } /** - * Gets number of maximum attempts to establish connection. + * Gets last used address from the pool if it exists. * - * @return max attempts number. - */ - public int getRetriesLimit() { - return retriesLimit; + * @return last obtained address or null + * if {@link #currentPosition} has {@link #UNSET_POSITION} value + */ + protected InetSocketAddress getLastObtainedAddress() { + Lock readLock = addressListLock.readLock(); + readLock.lock(); + try { + int index = currentPosition.get(); + return index != UNSET_POSITION ? socketAddresses.get(index) : null; + } finally { + readLock.unlock(); + } } - /** - * {@inheritDoc} - */ @Override - public SocketChannel get(int retryNumber, Throwable lastError) { - if (areRetriesExhausted(retryNumber)) { - throw new CommunicationException("Connection retries exceeded.", lastError); - } - int attempts = getAddressCount(); - long deadline = System.currentTimeMillis() + timeout * attempts; - while (!Thread.currentThread().isInterrupted()) { - SocketChannel channel = null; - try { - channel = SocketChannel.open(); - InetSocketAddress addr = getNextSocketAddress(); - channel.socket().connect(addr, timeout); - return channel; - } catch (IOException e) { - if (channel != null) { - try { - channel.close(); - } catch (IOException ignored) { - // No-op. - } - } - long now = System.currentTimeMillis(); - if (deadline <= now) { - throw new CommunicationException("Connection time out.", e); - } - if (--attempts == 0) { - // Tried all addresses without any lack, but still have time. - attempts = getAddressCount(); - try { - Thread.sleep((deadline - now) / attempts); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - } - } - } - throw new CommunicationException("Thread interrupted.", new InterruptedException()); + protected InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException { + return getNextSocketAddress(); } /** @@ -167,45 +127,40 @@ public SocketChannel get(int retryNumber, Throwable lastError) { * @return Number of configured addresses. */ protected int getAddressCount() { - return sockAddrs.length; + Lock readLock = addressListLock.readLock(); + readLock.lock(); + try { + return socketAddresses.size(); + } finally { + readLock.unlock(); + } } /** * Gets next address from the pool to be used to connect. * - * @return Socket address to use for the next reconnection attempt. + * @return Socket address to use for the next reconnection attempt */ protected InetSocketAddress getNextSocketAddress() { - InetSocketAddress res = sockAddrs[pos]; - pos = (pos + 1) % sockAddrs.length; - return res; + Lock readLock = addressListLock.readLock(); + readLock.lock(); + try { + int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size()); + return socketAddresses.get(position); + } finally { + readLock.unlock(); + } } /** - * Parse a string address in the form of [host]:[port] - * and builds a socket address. + * Update addresses pool by new list. * - * @param addr Server address. - * @return Socket address. - */ - protected InetSocketAddress parseAddress(String addr) { - int idx = addr.indexOf(':'); - String host = (idx < 0) ? addr : addr.substring(0, idx); - int port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1)); - return new InetSocketAddress(host, port); - } - - /** - * Provides a decision on whether retries limit is hit. + * @param addresses list of addresses to be applied * - * @param retries Current count of retries. - * @return {@code true} if retries are exhausted. + * @throws IllegalArgumentException if addresses list is empty */ - private boolean areRetriesExhausted(int retries) { - int limit = getRetriesLimit(); - if (limit < 0) { - return false; - } - return retries >= limit; + public void refreshAddresses(Collection addresses) { + updateAddressList(addresses); } + } diff --git a/src/main/java/org/tarantool/SingleSocketChannelProviderImpl.java b/src/main/java/org/tarantool/SingleSocketChannelProviderImpl.java new file mode 100644 index 00000000..0829bdc2 --- /dev/null +++ b/src/main/java/org/tarantool/SingleSocketChannelProviderImpl.java @@ -0,0 +1,43 @@ +package org.tarantool; + +import org.tarantool.util.StringUtils; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +/** + * Simple provider that produces a single connection. + * To be used with {@link TarantoolClientImpl}. + */ +public class SingleSocketChannelProviderImpl extends BaseSocketChannelProvider { + + private InetSocketAddress address; + + /** + * Creates a simple provider. + * + * @param address instance address + */ + public SingleSocketChannelProviderImpl(String address) { + setAddress(address); + } + + public SocketAddress getAddress() { + return address; + } + + @Override + protected InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException { + return address; + } + + public void setAddress(String address) { + if (StringUtils.isBlank(address)) { + throw new IllegalArgumentException("address must not be empty"); + } + + this.address = parseAddress(address); + } + +} diff --git a/src/main/java/org/tarantool/SocketChannelProvider.java b/src/main/java/org/tarantool/SocketChannelProvider.java index 8ae52476..e4f26d79 100644 --- a/src/main/java/org/tarantool/SocketChannelProvider.java +++ b/src/main/java/org/tarantool/SocketChannelProvider.java @@ -3,6 +3,10 @@ import java.nio.channels.SocketChannel; public interface SocketChannelProvider { + + int RETRY_NO_LIMIT = -1; + int NO_TIMEOUT = 0; + /** * Provides socket channel to init restore connection. * You could change hosts on fail and sleep between retries in this method diff --git a/src/main/java/org/tarantool/TarantoolClientConfig.java b/src/main/java/org/tarantool/TarantoolClientConfig.java index a863304c..43f06746 100644 --- a/src/main/java/org/tarantool/TarantoolClientConfig.java +++ b/src/main/java/org/tarantool/TarantoolClientConfig.java @@ -3,47 +3,49 @@ public class TarantoolClientConfig { /** - * Username and password for authorization. + * Auth-related data. */ public String username; public String password; /** - * Default ByteArrayOutputStream size when make query serialization. + * Default request size when make query serialization. */ public int defaultRequestSize = 4096; /** - * Initial size for map which holds futures of sent request. + * Initial capacity for the map which holds futures of sent request. */ public int predictedFutures = (int) ((1024 * 1024) / 0.75) + 1; - public int writerThreadPriority = Thread.NORM_PRIORITY; - public int readerThreadPriority = Thread.NORM_PRIORITY; - /** - * shared buffer is place where client collect requests when socket is busy on write. + * Shared buffer size (place where client collects requests + * when socket is busy on write). */ public int sharedBufferSize = 8 * 1024 * 1024; + /** - * not put request into the shared buffer if request size is ge directWriteFactor * sharedBufferSize. + * Factor to calculate a threshold whether request will be accommodated + * in the shared buffer. + *

+ * if request size exceeds directWriteFactor * sharedBufferSize + * request is sent directly. */ public double directWriteFactor = 0.5d; /** - * Use old call command https://github.com/tarantool/doc/issues/54, - * please ensure that you server supports new call command. + * Use old call command https://github.com/tarantool/doc/issues/54, + * please ensure that you server supports new call command. */ public boolean useNewCall = false; /** - * Any blocking ops timeout. + * Limits for synchronous operations. */ public long initTimeoutMillis = 60 * 1000L; - public long writeTimeoutMillis = 60 * 1000L; } diff --git a/src/main/java/org/tarantool/TarantoolClientImpl.java b/src/main/java/org/tarantool/TarantoolClientImpl.java index c1d40b79..12d8f676 100644 --- a/src/main/java/org/tarantool/TarantoolClientImpl.java +++ b/src/main/java/org/tarantool/TarantoolClientImpl.java @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantLock; public class TarantoolClientImpl extends TarantoolBase> implements TarantoolClient { + public static final CommunicationException NOT_INIT_EXCEPTION = new CommunicationException("Not connected, initializing connection"); @@ -34,22 +35,23 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar * External. */ protected SocketChannelProvider socketProvider; + protected SocketChannel channel; + protected ReadableViaSelectorChannel readChannel; + protected volatile Exception thumbstone; protected Map> futures; - protected AtomicInteger wait = new AtomicInteger(); + protected AtomicInteger pendingResponsesCount = new AtomicInteger(); /** * Write properties. */ - protected SocketChannel channel; - protected ReadableViaSelectorChannel readChannel; - protected ByteBuffer sharedBuffer; - protected ByteBuffer writerBuffer; protected ReentrantLock bufferLock = new ReentrantLock(false); protected Condition bufferNotEmpty = bufferLock.newCondition(); protected Condition bufferEmpty = bufferLock.newCondition(); + + protected ByteBuffer writerBuffer; protected ReentrantLock writeLock = new ReentrantLock(true); /** @@ -81,6 +83,10 @@ public void run() { } }); + public TarantoolClientImpl(String address, TarantoolClientConfig config) { + this(new SingleSocketChannelProviderImpl(address), config); + } + public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClientConfig config) { super(); this.thumbstone = NOT_INIT_EXCEPTION; @@ -102,6 +108,11 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient this.fireAndForgetOps.setCallCode(Code.CALL); this.composableAsyncOps.setCallCode(Code.CALL); } + + startConnector(config); + } + + private void startConnector(TarantoolClientConfig config) { connector.start(); try { if (!waitAlive(config.initTimeoutMillis, TimeUnit.MILLISECONDS)) { @@ -254,6 +265,7 @@ protected synchronized void die(String message, Exception cause) { iterator.remove(); } } + pendingResponsesCount.set(0); bufferLock.lock(); try { @@ -306,7 +318,7 @@ protected void sharedWrite(ByteBuffer buffer) throws InterruptedException, Timeo } } sharedBuffer.put(buffer); - wait.incrementAndGet(); + pendingResponsesCount.incrementAndGet(); bufferNotEmpty.signalAll(); stats.buffered++; } finally { @@ -333,7 +345,7 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx } writeFully(channel, buffer); stats.directWrite++; - wait.incrementAndGet(); + pendingResponsesCount.incrementAndGet(); } finally { writeLock.unlock(); } @@ -360,7 +372,7 @@ protected void readThread() { Long syncId = (Long) headers.get(Key.SYNC.getId()); TarantoolOp future = futures.remove(syncId); stats.received++; - wait.decrementAndGet(); + pendingResponsesCount.decrementAndGet(); complete(packet, future); } catch (Exception e) { die("Cant read answer", e); @@ -431,9 +443,9 @@ protected void completeSql(CompletableFuture future, TarantoolPacket pack) { } } - protected T syncGet(Future r) { + protected T syncGet(Future result) { try { - return r.get(); + return result.get(); } catch (ExecutionException e) { if (e.getCause() instanceof CommunicationException) { throw (CommunicationException) e.getCause(); @@ -464,7 +476,6 @@ public void close() { protected void close(Exception e) { if (state.close()) { connector.interrupt(); - die(e.getMessage(), e); } } @@ -564,9 +575,11 @@ public List exec(Code code, Object... args) { public void close() { throw new IllegalStateException("You should close TarantoolClient instead."); } + } protected class FireAndForgetOps extends AbstractTarantoolOps, Object, Long> { + @Override public Long exec(Code code, Object... args) { if (thumbstone == null) { @@ -586,6 +599,7 @@ public Long exec(Code code, Object... args) { public void close() { throw new IllegalStateException("You should close TarantoolClient instead."); } + } protected class ComposableAsyncOps @@ -600,10 +614,11 @@ public CompletionStage> exec(Code code, Object... args) { public void close() { TarantoolClientImpl.this.close(); } + } protected boolean isDead(CompletableFuture q) { - if (TarantoolClientImpl.this.thumbstone != null) { + if (this.thumbstone != null) { fail(q, new CommunicationException("Connection is dead", thumbstone)); return true; } @@ -630,6 +645,7 @@ public TarantoolClientStats getStats() { * Manages state changes. */ protected final class StateHelper { + static final int UNINITIALIZED = 0; static final int READING = 1; static final int WRITING = 2; @@ -640,14 +656,14 @@ protected final class StateHelper { private final AtomicInteger state; private final AtomicReference nextAliveLatch = - new AtomicReference<>(new CountDownLatch(1)); + new AtomicReference<>(new CountDownLatch(1)); private final CountDownLatch closedLatch = new CountDownLatch(1); /** * The condition variable to signal a reconnection is needed from reader / * writer threads and waiting for that signal from the reconnection thread. - * + *

* The lock variable to access this condition. * * @see #awaitReconnection() @@ -685,7 +701,7 @@ protected boolean close() { /** * Move from a current state to a give one. - * + *

* Some moves are forbidden. */ protected boolean acquire(int mask) { @@ -805,6 +821,7 @@ private void trySignalForReconnection() { } } } + } protected static class TarantoolOp extends CompletableFuture { diff --git a/src/main/java/org/tarantool/TarantoolClusterClient.java b/src/main/java/org/tarantool/TarantoolClusterClient.java index 4b200fcc..49fdb4af 100644 --- a/src/main/java/org/tarantool/TarantoolClusterClient.java +++ b/src/main/java/org/tarantool/TarantoolClusterClient.java @@ -1,13 +1,23 @@ package org.tarantool; -import static org.tarantool.TarantoolClientImpl.StateHelper.CLOSED; +import org.tarantool.cluster.TarantoolClusterDiscoverer; +import org.tarantool.cluster.TarantoolClusterStoredFunctionDiscoverer; +import org.tarantool.protocol.TarantoolPacket; +import org.tarantool.util.StringUtils; +import java.io.IOException; +import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.StampedLock; /** * Basic implementation of a client that may work with the cluster @@ -17,20 +27,32 @@ * unless the configured expiration time is over. */ public class TarantoolClusterClient extends TarantoolClientImpl { - /* Need some execution context to retry writes. */ + + /** + * Need some execution context to retry writes. + */ private Executor executor; - /* Collection of operations to be retried. */ - private ConcurrentHashMap> retries = new ConcurrentHashMap>(); + /** + * Discovery activity. + */ + private ScheduledExecutorService instancesDiscoveryExecutor; + private Runnable instancesDiscovererTask; + private StampedLock discoveryLock = new StampedLock(); + + /** + * Collection of operations to be retried. + */ + private ConcurrentHashMap> retries = new ConcurrentHashMap<>(); /** * Constructs a new cluster client. * - * @param config Configuration. - * @param addrs Array of addresses in the form of [host]:[port]. + * @param config Configuration. + * @param addresses Array of addresses in the form of host[:port]. */ - public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addrs) { - this(config, new RoundRobinSocketProviderImpl(addrs).setTimeout(config.operationExpiryTimeMillis)); + public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addresses) { + this(config, makeClusterSocketProvider(addresses, config.operationExpiryTimeMillis)); } /** @@ -41,12 +63,33 @@ public TarantoolClusterClient(TarantoolClusterClientConfig config, String... add */ public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannelProvider provider) { super(provider, config); - this.executor = config.executor == null ? Executors.newSingleThreadExecutor() : config.executor; + + this.executor = config.executor == null + ? Executors.newSingleThreadExecutor() + : config.executor; + + if (StringUtils.isNotBlank(config.clusterDiscoveryEntryFunction)) { + this.instancesDiscovererTask = + createDiscoveryTask(new TarantoolClusterStoredFunctionDiscoverer(config, this)); + this.instancesDiscoveryExecutor + = Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantoolDiscoverer")); + int delay = config.clusterDiscoveryDelayMillis > 0 + ? config.clusterDiscoveryDelayMillis + : TarantoolClusterClientConfig.DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS; + + // todo: it's better to start a job later (out of ctor) + this.instancesDiscoveryExecutor.scheduleWithFixedDelay( + this.instancesDiscovererTask, + 0, + delay, + TimeUnit.MILLISECONDS + ); + } } @Override protected boolean isDead(CompletableFuture q) { - if ((state.getState() & CLOSED) != 0) { + if ((state.getState() & StateHelper.CLOSED) != 0) { q.completeExceptionally(new CommunicationException("Connection is dead", thumbstone)); return true; } @@ -62,22 +105,41 @@ protected CompletableFuture doExec(Code code, Object[] args) { validateArgs(args); long sid = syncId.incrementAndGet(); ExpirableOp future = makeFuture(sid, code, args); + return registerOperation(future); + } - if (isDead(future)) { - return future; - } - futures.put(sid, future); - if (isDead(future)) { - futures.remove(sid); - return future; - } + /** + * Registers a new async operation which will be resolved later. + * Registration is discovery-aware in term of synchronization and + * it may be blocked util the discovery finishes its work. + * + * @param future operation to be performed + * + * @return registered operation + */ + private CompletableFuture registerOperation(ExpirableOp future) { + long stamp = discoveryLock.readLock(); try { - write(code, sid, null, args); - } catch (Exception e) { - futures.remove(sid); - fail(future, e); + if (isDead(future)) { + return future; + } + futures.put(future.getId(), future); + if (isDead(future)) { + futures.remove(future.getId()); + return future; + } + + try { + write(future.getCode(), future.getId(), null, future.getArgs()); + } catch (Exception e) { + futures.remove(future.getId()); + fail(future, e); + } + + return future; + } finally { + discoveryLock.unlock(stamp); } - return future; } @Override @@ -101,6 +163,10 @@ protected boolean checkFail(CompletableFuture q, Exception e) { protected void close(Exception e) { super.close(e); + if (instancesDiscoveryExecutor != null) { + instancesDiscoveryExecutor.shutdownNow(); + } + if (retries == null) { // May happen within constructor. return; @@ -135,27 +201,100 @@ protected void onReconnect() { // First call is before the constructor finished. Skip it. return; } - Collection> futuresToRetry = new ArrayList>(retries.values()); + Collection> futuresToRetry = new ArrayList<>(retries.values()); retries.clear(); long now = System.currentTimeMillis(); for (final ExpirableOp future : futuresToRetry) { if (!future.hasExpired(now)) { - executor.execute(new Runnable() { - @Override - public void run() { - futures.put(future.getId(), future); - try { - write(future.getCode(), future.getId(), null, future.getArgs()); - } catch (Exception e) { - futures.remove(future.getId()); - fail(future, e); - } - } - }); + executor.execute(() -> registerOperation(future)); } } } + @Override + protected void complete(TarantoolPacket packet, TarantoolOp future) { + super.complete(packet, future); + RefreshableSocketProvider provider = getRefreshableSocketProvider(); + if (provider != null) { + renewConnectionIfRequired(provider.getAddresses()); + } + } + + protected void onInstancesRefreshed(Set instances) { + RefreshableSocketProvider provider = getRefreshableSocketProvider(); + if (provider != null) { + provider.refreshAddresses(instances); + renewConnectionIfRequired(provider.getAddresses()); + } + } + + private RefreshableSocketProvider getRefreshableSocketProvider() { + return socketProvider instanceof RefreshableSocketProvider + ? (RefreshableSocketProvider) socketProvider + : null; + } + + private void renewConnectionIfRequired(Collection addresses) { + if (pendingResponsesCount.get() > 0 || !isAlive()) { + return; + } + SocketAddress addressInUse = getCurrentAddressOrNull(); + if (!(addressInUse == null || addresses.contains(addressInUse))) { + long stamp = discoveryLock.tryWriteLock(); + if (!discoveryLock.validate(stamp)) { + return; + } + try { + if (pendingResponsesCount.get() == 0) { + stopIO(); + } + } finally { + discoveryLock.unlock(stamp); + } + } + } + + private SocketAddress getCurrentAddressOrNull() { + try { + return channel.getRemoteAddress(); + } catch (IOException ignored) { + return null; + } + } + + public void refreshInstances() { + if (instancesDiscovererTask != null) { + instancesDiscovererTask.run(); + } + } + + private static RoundRobinSocketProviderImpl makeClusterSocketProvider(String[] addresses, + int connectionTimeout) { + RoundRobinSocketProviderImpl socketProvider = new RoundRobinSocketProviderImpl(addresses); + socketProvider.setTimeout(connectionTimeout); + return socketProvider; + } + + private Runnable createDiscoveryTask(TarantoolClusterDiscoverer serviceDiscoverer) { + return new Runnable() { + + private Set lastInstances; + + @Override + public synchronized void run() { + try { + Set freshInstances = serviceDiscoverer.getInstances(); + if (!(freshInstances.isEmpty() || Objects.equals(lastInstances, freshInstances))) { + lastInstances = freshInstances; + onInstancesRefreshed(lastInstances); + } + } catch (Exception ignored) { + // no-op + } + } + }; + } + /** * Holds operation code and arguments for retry. */ @@ -202,6 +341,6 @@ public long getId() { public Object[] getArgs() { return args; } - } + } diff --git a/src/main/java/org/tarantool/TarantoolClusterClientConfig.java b/src/main/java/org/tarantool/TarantoolClusterClientConfig.java index 423896b3..81f67cbb 100644 --- a/src/main/java/org/tarantool/TarantoolClusterClientConfig.java +++ b/src/main/java/org/tarantool/TarantoolClusterClientConfig.java @@ -6,9 +6,30 @@ * Configuration for the {@link TarantoolClusterClient}. */ public class TarantoolClusterClientConfig extends TarantoolClientConfig { - /* Amount of time (in milliseconds) the operation is eligible for retry. */ - public int operationExpiryTimeMillis = 500; - /* Executor service that will be used as a thread of execution to retry writes. */ - public Executor executor = null; + public static final int DEFAULT_OPERATION_EXPIRY_TIME_MILLIS = 500; + public static final int DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS = 60_000; + + /** + * Period for the operation is eligible for retry. + */ + public int operationExpiryTimeMillis = DEFAULT_OPERATION_EXPIRY_TIME_MILLIS; + + /** + * Executor that will be used as a thread of + * execution to retry writes. + */ + public Executor executor; + + /** + * Gets a name of the stored function to be used + * to fetch list of instances. + */ + public String clusterDiscoveryEntryFunction; + + /** + * Scan period for refreshing a new list of instances. + */ + public int clusterDiscoveryDelayMillis = DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS; + } diff --git a/src/main/java/org/tarantool/TarantoolThreadDaemonFactory.java b/src/main/java/org/tarantool/TarantoolThreadDaemonFactory.java new file mode 100644 index 00000000..f1b4dfb5 --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolThreadDaemonFactory.java @@ -0,0 +1,23 @@ +package org.tarantool; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class TarantoolThreadDaemonFactory implements ThreadFactory { + + private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + public TarantoolThreadDaemonFactory(String namePrefix) { + this.namePrefix = namePrefix + "-" + POOL_NUMBER.incrementAndGet() + "-thread-"; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, namePrefix + threadNumber.incrementAndGet()); + thread.setDaemon(true); + + return thread; + } +} diff --git a/src/main/java/org/tarantool/cluster/IllegalDiscoveryFunctionResult.java b/src/main/java/org/tarantool/cluster/IllegalDiscoveryFunctionResult.java new file mode 100644 index 00000000..41f8dd82 --- /dev/null +++ b/src/main/java/org/tarantool/cluster/IllegalDiscoveryFunctionResult.java @@ -0,0 +1,13 @@ +package org.tarantool.cluster; + +/** + * Raised when {@link TarantoolClusterStoredFunctionDiscoverer} validates + * a function result as unsupported. + */ +public class IllegalDiscoveryFunctionResult extends RuntimeException { + + public IllegalDiscoveryFunctionResult(String message) { + super(message); + } + +} diff --git a/src/main/java/org/tarantool/cluster/TarantoolClusterDiscoverer.java b/src/main/java/org/tarantool/cluster/TarantoolClusterDiscoverer.java new file mode 100644 index 00000000..26e5da3c --- /dev/null +++ b/src/main/java/org/tarantool/cluster/TarantoolClusterDiscoverer.java @@ -0,0 +1,21 @@ +package org.tarantool.cluster; + +import java.util.Set; + +/** + * Discovery strategy to obtain a list of the cluster nodes. + * This one can be used by {@link org.tarantool.RefreshableSocketProvider} + * to provide support for fault tolerance property. + * + * @see org.tarantool.RefreshableSocketProvider + */ +public interface TarantoolClusterDiscoverer { + + /** + * Gets nodes addresses in host[:port] format. + * + * @return list of the cluster nodes + */ + Set getInstances(); + +} diff --git a/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java new file mode 100644 index 00000000..c25b578d --- /dev/null +++ b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java @@ -0,0 +1,63 @@ +package org.tarantool.cluster; + +import org.tarantool.TarantoolClient; +import org.tarantool.TarantoolClientOps; +import org.tarantool.TarantoolClusterClientConfig; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A cluster nodes discoverer based on calling a predefined function + * which returns list of nodes. + * + * The function has to have no arguments and return list of + * the strings which follow host[:port] format + */ +public class TarantoolClusterStoredFunctionDiscoverer implements TarantoolClusterDiscoverer { + + private TarantoolClient client; + private String entryFunction; + + public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, TarantoolClient client) { + this.client = client; + this.entryFunction = clientConfig.clusterDiscoveryEntryFunction; + } + + @Override + public Set getInstances() { + TarantoolClientOps, Object, List> syncOperations = client.syncOps(); + + List list = syncOperations.call(entryFunction); + // discoverer expects a single array result from the function now; + // in order to protect this contract the discoverer does a strict + // validation against the data returned; + // this strict-mode allows us to extend the contract in a non-breaking + // way for old clients just reserve an extra return value in + // terms of LUA multi-result support. + checkResult(list); + + List funcResult = (List) list.get(0); + return funcResult.stream() + .map(Object::toString) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + /** + * Check whether the result follows the contract or not. + * The contract is a mandatory single array of strings + * + * @param result result to be validated + */ + private void checkResult(List result) { + if (result == null || result.isEmpty()) { + throw new IllegalDiscoveryFunctionResult("Discovery function returned no data"); + } + if (!((List)result.get(0)).stream().allMatch(item -> item instanceof String)) { + throw new IllegalDiscoveryFunctionResult("The first value must be an array of strings"); + } + } + +} diff --git a/src/main/java/org/tarantool/util/StringUtils.java b/src/main/java/org/tarantool/util/StringUtils.java new file mode 100644 index 00000000..7a289a3f --- /dev/null +++ b/src/main/java/org/tarantool/util/StringUtils.java @@ -0,0 +1,21 @@ +package org.tarantool.util; + +public class StringUtils { + + public static boolean isEmpty(String string) { + return (string == null) || (string.isEmpty()); + } + + public static boolean isNotEmpty(String string) { + return !isEmpty(string); + } + + public static boolean isBlank(String string) { + return (string == null) || (string.trim().isEmpty()); + } + + public static boolean isNotBlank(String string) { + return !isBlank(string); + } + +} diff --git a/src/test/java/org/tarantool/AbstractSocketProviderTest.java b/src/test/java/org/tarantool/AbstractSocketProviderTest.java new file mode 100644 index 00000000..526e55b8 --- /dev/null +++ b/src/test/java/org/tarantool/AbstractSocketProviderTest.java @@ -0,0 +1,50 @@ +package org.tarantool; + +import static org.mockito.Mockito.anyObject; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.SocketChannel; +import java.util.Collection; +import java.util.stream.Collectors; + +public class AbstractSocketProviderTest { + + protected String extractRawHostAndPortString(SocketAddress socketAddress) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + return inetSocketAddress.getAddress().getHostName() + ":" + inetSocketAddress.getPort(); + } + + protected Iterable asRawHostAndPort(Collection addresses) { + return addresses.stream() + .map(this::extractRawHostAndPortString) + .collect(Collectors.toList()); + } + + protected T wrapWithMockChannelProvider(T source) throws IOException { + T wrapper = spy(source); + doReturn(makeSocketChannel()).when(wrapper).openChannel(anyObject()); + return wrapper; + } + + protected T wrapWithMockErroredChannelProvider(T source) throws IOException { + T wrapper = spy(source); + doThrow(IOException.class).when(wrapper).openChannel(anyObject()); + return wrapper; + } + + private SocketChannel makeSocketChannel() { + SocketChannel socketChannel = mock(SocketChannel.class); + when(socketChannel.socket()).thenReturn(mock(Socket.class)); + + return socketChannel; + } + +} diff --git a/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java b/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java index f48675fc..f982b67b 100644 --- a/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java +++ b/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java @@ -37,8 +37,8 @@ public abstract class AbstractTarantoolConnectorIT { protected static final int TIMEOUT = 500; protected static final int RESTART_TIMEOUT = 2000; - protected static final SocketChannelProvider socketChannelProvider = new TestSocketChannelProvider(host, port, - RESTART_TIMEOUT); + protected static final SocketChannelProvider socketChannelProvider = + new TestSocketChannelProvider(host, port, RESTART_TIMEOUT); protected static TarantoolControl control; protected static TarantoolConsole console; @@ -143,14 +143,14 @@ protected static TarantoolClientConfig makeClientConfig() { return fillClientConfig(new TarantoolClientConfig()); } - protected static TarantoolClusterClientConfig makeClusterClientConfig() { + public static TarantoolClusterClientConfig makeClusterClientConfig() { TarantoolClusterClientConfig config = fillClientConfig(new TarantoolClusterClientConfig()); config.executor = null; config.operationExpiryTimeMillis = TIMEOUT; return config; } - private static T fillClientConfig(TarantoolClientConfig config) { + private static T fillClientConfig(T config) { config.username = username; config.password = password; config.initTimeoutMillis = RESTART_TIMEOUT; @@ -163,7 +163,7 @@ protected static TarantoolConsole openConsole() { } protected static TarantoolConsole openConsole(String instance) { - return TarantoolConsole.open(control.tntCtlWorkDir, instance); + return TarantoolConsole.open(TarantoolControl.tntCtlWorkDir, instance); } protected TarantoolConnection openConnection() { diff --git a/src/test/java/org/tarantool/ClientReconnectClusterIT.java b/src/test/java/org/tarantool/ClientReconnectClusterIT.java index 0cb3af55..08aee921 100644 --- a/src/test/java/org/tarantool/ClientReconnectClusterIT.java +++ b/src/test/java/org/tarantool/ClientReconnectClusterIT.java @@ -3,25 +3,41 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.tarantool.AbstractTarantoolConnectorIT.makeClusterClientConfig; +import static org.tarantool.TestUtils.makeDiscoveryFunction; import static org.tarantool.TestUtils.makeInstanceEnv; +import org.tarantool.cluster.ClusterServiceStoredFunctionDiscovererIT; + import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.function.Executable; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +@DisplayName("A cluster client") public class ClientReconnectClusterIT { + + private static final String SCHEMA_PATTERN = + "return box.schema.space.create('%1$s').id, box.space.%1$s:create_index('primary').id"; + private static final int TIMEOUT = 500; private static final String LUA_FILE = "jdk-testing.lua"; private static final String SRV1 = "replica1"; private static final String SRV2 = "replica2"; private static final String SRV3 = "replica3"; - private static final int[] PORTS = {3302, 3303, 3304}; - private static final int[] CONSOLE_PORTS = {3312, 3313, 3314}; + private static final int[] PORTS = { 3302, 3303, 3304 }; + private static final int[] CONSOLE_PORTS = { 3312, 3313, 3314 }; private static TarantoolControl control; private static String REPLICATION_CONFIG = TestUtils.makeReplicationString( @@ -29,7 +45,8 @@ public class ClientReconnectClusterIT { AbstractTarantoolConnectorIT.password, "localhost:" + PORTS[0], "localhost:" + PORTS[1], - "localhost:" + PORTS[2]); + "localhost:" + PORTS[2] + ); // Resume replication faster in case of temporary failure to fit TIMEOUT. private static double REPLICATION_TIMEOUT = 0.1; @@ -38,10 +55,13 @@ public class ClientReconnectClusterIT { public static void setupEnv() { control = new TarantoolControl(); int idx = 0; - for (String name: Arrays.asList(SRV1, SRV2, SRV3)) { + for (String name : Arrays.asList(SRV1, SRV2, SRV3)) { control.createInstance(name, LUA_FILE, - makeInstanceEnv(PORTS[idx], CONSOLE_PORTS[idx], REPLICATION_CONFIG, - REPLICATION_TIMEOUT)); + makeInstanceEnv( + PORTS[idx], CONSOLE_PORTS[idx], + REPLICATION_CONFIG, REPLICATION_TIMEOUT + ) + ); idx++; } } @@ -57,62 +77,410 @@ public static void tearDownEnv() { } } - @Test - public void testRoundRobinReconnect() { - control.start(SRV1); - control.start(SRV2); - control.start(SRV3); + @BeforeEach + public void setUpTest() { + startInstancesAndAwait(SRV1, SRV2, SRV3); + } - control.waitStarted(SRV1); - control.waitStarted(SRV2); - control.waitStarted(SRV3); + @AfterEach + public void tearDownTest() { + stopInstancesAndAwait(SRV1, SRV2, SRV3); + } - final TarantoolClientImpl client = makeClient( + @Test + @DisplayName("reconnected to another node after the current node had disappeared") + public void testRoundRobinReconnect() { + final TarantoolClientImpl client = makeClusterClient( "localhost:" + PORTS[0], "127.0.0.1:" + PORTS[1], - "localhost:" + PORTS[2]); + "localhost:" + PORTS[2] + ); + + int[] ids = makeAndFillTestSpace(client, "rr_test1"); + final int spaceId = ids[0]; + final int pkId = ids[1]; - List ids = client.syncOps().eval( - "return box.schema.space.create('rr_test').id, " + - "box.space.rr_test:create_index('primary').id" + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV1); + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV2); + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV3); + expectDisconnected(client, spaceId, pkId); + } + + /** + * Before fetch client = { srv1 } + * After fetch client = { srv1, srv2 } + *

+ * 1. fetch nodes - ok (client will apply { srv1, srv2 } as a new nodes list) + * 2. shutdown srv1 - ok (client will reconnect to srv2) + * 3. shutdown srv2 - fail (there are no available nodes anymore) + */ + @Test + @DisplayName("applied new nodes and reconnected to another node") + void testUpdateExtendedNodeList() { + String service1Address = "localhost:" + PORTS[0]; + String service2Address = "127.0.0.1:" + PORTS[1]; + String service3Address = "localhost:" + PORTS[2]; + + CyclicBarrier barrier = new CyclicBarrier(2); + + String infoFunctionName = "getAddresses"; + String infoFunctionScript = + makeDiscoveryFunction(infoFunctionName, Arrays.asList(service1Address, service2Address)); + + control.openConsole(SRV1).exec(infoFunctionScript); + + final TarantoolClusterClient client = makeClientWithDiscoveryFeature( + infoFunctionName, + 0, + (ignored) -> tryAwait(barrier), + service1Address ); - final int spaceId = ((Number)ids.get(0)).intValue(); - final int pkId = ((Number)ids.get(1)).intValue(); + int[] ids = makeAndFillTestSpace(client, "rr_test2"); + final int spaceId = ids[0]; + final int pkId = ids[1]; + + tryAwait(barrier); // client = { srv1 }; wait for { srv1, srv2 } + + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV1); + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV2); + expectDisconnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV3); + } + + /** + * Before fetch client = { srv1, srv2 } + * After fetch client = { srv1 } + *

+ * 1. fetch nodes - ok (client will apply the narrowed { srv1 } + * 2. shutdown srv1 - fail (client will not reconnect to srv2 because latest is out of the list) + */ + @Test + @DisplayName("applied new nodes and stayed connected to the current node") + void testUpdateNarrowNodeList() { + String service1Address = "localhost:" + PORTS[0]; + String service2Address = "127.0.0.1:" + PORTS[1]; + + CyclicBarrier barrier = new CyclicBarrier(2); + + String infoFunctionName = "getAddresses"; + String infoFunctionScript = makeDiscoveryFunction(infoFunctionName, Collections.singletonList(service1Address)); + + control.openConsole(SRV1).exec(infoFunctionScript); + + final TarantoolClusterClient client = makeClientWithDiscoveryFeature( + infoFunctionName, + 0, + (ignored) -> tryAwait(barrier), + service1Address, + service2Address + ); + + int[] ids = makeAndFillTestSpace(client, "rr_test3"); + final int spaceId = ids[0]; + final int pkId = ids[1]; + + tryAwait(barrier); // client = { srv1, srv2 }; wait for { srv1 } + + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV1); + expectDisconnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV2); + stopInstancesAndAwait(SRV3); + } + + /** + * Before fetch client = { srv1, srv2, srv3 } + * After fetch client = { srv1, srv2, srv3 } + *

+ * 1. fetch nodes - ok (client will ignore the same list) + * 2. shutdown srv1 - ok + * 3. shutdown srv2 - ok + * 4. shutdown srv3 - fail + */ + @Test + @DisplayName("applied empty list and stayed connected to the current node") + void testUpdateEmptyNodeList() { + String service1Address = "localhost:" + PORTS[0]; + String service2Address = "127.0.0.1:" + PORTS[1]; + String service3Address = "localhost:" + PORTS[2]; + + String infoFunctionName = "getAddresses"; + String infoFunctionScript = makeDiscoveryFunction(infoFunctionName, Collections.emptyList()); + + control.openConsole(SRV1).exec(infoFunctionScript); + + final TarantoolClusterClient client = makeClientWithDiscoveryFeature( + infoFunctionName, + service1Address, + service2Address, + service3Address + ); + + int[] ids = makeAndFillTestSpace(client, "rr_test4"); + final int spaceId = ids[0]; + final int pkId = ids[1]; + + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV1); + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV2); + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV3); + expectDisconnected(client, spaceId, pkId); + } + + /** + * Before fetch client = { srv1, srv2, srv3 } + * After fetch client = { srv1, srv2, srv3 } + *

+ * 1. fetch with an exception (i.e. missing/error-prone function) - ok (client will ignore the failure) + * 2. shutdown srv1 - ok + * 3. shutdown srv2 - ok + * 4. shutdown srv3 - fail + * + * @see ClusterServiceStoredFunctionDiscovererIT#testFunctionWithError() + */ + @Test + @DisplayName("applied nothing and stayed connected to the current node") + void testWrongConfigFetch() { + String service1Address = "localhost:" + PORTS[0]; + String service2Address = "127.0.0.1:" + PORTS[1]; + String service3Address = "localhost:" + PORTS[2]; + + String infoFunctionName = "getAddresses"; + String infoFunctionScript = makeDiscoveryFunction(infoFunctionName, Collections.emptyList()); + + control.openConsole(SRV1).exec(infoFunctionScript); + + final TarantoolClusterClient client = makeClientWithDiscoveryFeature( + infoFunctionName + "wrongSuffix", + service1Address, + service2Address, + service3Address + ); + + int[] ids = makeAndFillTestSpace(client, "rr_test5"); + final int spaceId = ids[0]; + final int pkId = ids[1]; + + expectConnected(client, spaceId, pkId); + stopInstancesAndAwait(SRV1); + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV2); + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV3); + expectDisconnected(client, spaceId, pkId); + } + + /** + * Before fetch client = { srv1, srv2, srv3 } + * After fetch client = { srv1, srv2, srv3 } + *

+ * 1. fetch an improper result - ok (client will ignore the awkward data) + * 2. shutdown srv1 - ok + * 3. shutdown srv2 - ok + * 4. shutdown srv3 - fail + */ + @Test + @DisplayName("ignored an wrong function result and stayed connected to the current node") + void testWrongFunctionResultFetch() { + String service1Address = "localhost:" + PORTS[0]; + String service2Address = "127.0.0.1:" + PORTS[1]; + String service3Address = "localhost:" + PORTS[2]; + + String infoFunctionName = "getWhateverExceptAddressesListFunction"; + String infoFunctionScript = makeDiscoveryFunction(infoFunctionName, 42); + + control.openConsole(SRV1).exec(infoFunctionScript); + + final TarantoolClusterClient client = makeClientWithDiscoveryFeature( + infoFunctionName, + service1Address, + service2Address, + service3Address + ); + + int[] ids = makeAndFillTestSpace(client, "rr_test6"); + final int spaceId = ids[0]; + final int pkId = ids[1]; + + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV1); + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV2); + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV3); + expectDisconnected(client, spaceId, pkId); + } + + /** + * Before fetch client = { srv1 } + * After fetch ph1 client = { srv1 } + * After fetch ph2 client = { srv2 } + * After fetch ph3 client = { srv3 } + *

+ * 1. fetch an initial result (ph1) - ok (client will ignore the same data) + * 2. fetch the 2nd result (ph2) - ok (client will reconnect to srv2) + * 3. shutdown srv1 - ok + * 4. fetch the 3rd result (ph3) - ok (client will reconnect to srv3) + * 5. shutdown srv2 - ok + * 6. shutdown srv3 - fail + */ + @Test + @DisplayName("applied each second a new cluster node and reconnected to it") + void testDelayFunctionResultFetch() { + String service1Address = "localhost:" + PORTS[0]; + String service2Address = "127.0.0.1:" + PORTS[1]; + String service3Address = "localhost:" + PORTS[2]; + + CyclicBarrier barrier = new CyclicBarrier(2); + + String infoFunctionName = "getAddressesFunction"; + String functionBody = Stream.of(service1Address, service2Address) + .map(address -> "coroutine.yield('" + address + "');") + .collect(Collectors.joining(" ")); + + control.openConsole(SRV1) + .exec("co = coroutine.create(function() " + functionBody + " end)"); + control.openConsole(SRV1) + .exec("function getAddressesFunction() local c, r = coroutine.resume(co); return r end"); + + String infoFunctionScript = makeDiscoveryFunction(infoFunctionName, Collections.singletonList(service3Address)); + control.openConsole(SRV2).exec(infoFunctionScript); + + final TarantoolClusterClient client = makeClientWithDiscoveryFeature( + infoFunctionName, + 3000, + (ignored) -> tryAwait(barrier), + service1Address + ); + + int[] ids = makeAndFillTestSpace(client, "rr_test7"); + final int spaceId = ids[0]; + final int pkId = ids[1]; + + tryAwait(barrier); // client = { srv1 }; wait for { srv1 } + + expectConnected(client, spaceId, pkId); + + tryAwait(barrier); // client = { srv1 }; wait for { srv2 } + + stopInstancesAndAwait(SRV1); + expectConnected(client, spaceId, pkId); + + tryAwait(barrier); // client = { srv2 }; wait for { srv3 } + + stopInstancesAndAwait(SRV2); + expectConnected(client, spaceId, pkId); + + stopInstancesAndAwait(SRV3); + expectDisconnected(client, spaceId, pkId); + } + + private void tryAwait(CyclicBarrier barrier) { + try { + barrier.await(6000, TimeUnit.MILLISECONDS); + } catch (Throwable e) { + e.printStackTrace(); + } + } + + private void startInstancesAndAwait(String... instances) { + for (String instance : instances) { + control.start(instance); + } + for (String instance : instances) { + control.waitStarted(instance); + } + } + + private void stopInstancesAndAwait(String... instances) { + for (String instance : instances) { + control.stop(instance); + } + for (String instance : instances) { + control.waitStopped(instance); + } + } + + private void expectConnected(TarantoolClientImpl client, int spaceId, int pkId) { final List key = Collections.singletonList(1); final List tuple = Arrays.asList(1, 1); - client.syncOps().insert(spaceId, tuple); - control.waitReplication(SRV1, TIMEOUT); - List res = client.syncOps().select(spaceId, pkId, key, 0, 1, Iterator.EQ); assertEquals(res.get(0), tuple); + } - control.stop(SRV1); + private void expectDisconnected(TarantoolClientImpl client, int spaceId, int pkId) { + final List key = Collections.singletonList(1); - res = client.syncOps().select(spaceId, pkId, key, 0, 1, Iterator.EQ); - assertEquals(res.get(0), Arrays.asList(1, 1)); + assertThrows( + CommunicationException.class, + () -> client.syncOps().select(spaceId, pkId, key, 0, 1, Iterator.EQ) + ); + } - control.stop(SRV2); + private int[] makeAndFillTestSpace(TarantoolClientImpl client, String spaceName) { + List ids = client.syncOps().eval(String.format(SCHEMA_PATTERN, spaceName)); - res = client.syncOps().select(spaceId, pkId, key, 0, 1, Iterator.EQ); - assertEquals(res.get(0), Arrays.asList(1, 1)); + final int spaceId = ((Number) ids.get(0)).intValue(); + final int pkId = ((Number) ids.get(1)).intValue(); - control.stop(SRV3); + client.syncOps().insert(spaceId, Arrays.asList(1, 1)); + control.waitReplication(SRV1, TIMEOUT); - CommunicationException e = assertThrows(CommunicationException.class, new Executable() { - @Override - public void execute() throws Throwable { - client.syncOps().select(spaceId, pkId, key, 0, 1, Iterator.EQ); - } - }); + return new int[] { spaceId, pkId }; + } + + private TarantoolClusterClient makeClusterClient(String... addresses) { + return makeClientWithDiscoveryFeature(null, addresses); + } - assertEquals("Connection time out.", e.getMessage()); + private TarantoolClusterClient makeClientWithDiscoveryFeature(String entryFunction, + String... addresses) { + return makeClientWithDiscoveryFeature(entryFunction, 0, null, addresses); } - private TarantoolClientImpl makeClient(String...addrs) { + private TarantoolClusterClient makeClientWithDiscoveryFeature(String entryFunction, + int entryDelayMillis, + Consumer> consumer, + String... addresses) { TarantoolClusterClientConfig config = makeClusterClientConfig(); - return new TarantoolClusterClient(config, addrs); + config.clusterDiscoveryEntryFunction = entryFunction; + config.clusterDiscoveryDelayMillis = entryDelayMillis; + + return new TarantoolClusterClient(config, addresses) { + @Override + protected void onInstancesRefreshed(Set instances) { + super.onInstancesRefreshed(instances); + if (consumer != null) { + consumer.accept(instances); + } + } + }; } + } diff --git a/src/test/java/org/tarantool/RoundRobinSocketProviderImplTest.java b/src/test/java/org/tarantool/RoundRobinSocketProviderImplTest.java new file mode 100644 index 00000000..f494be47 --- /dev/null +++ b/src/test/java/org/tarantool/RoundRobinSocketProviderImplTest.java @@ -0,0 +1,155 @@ +package org.tarantool; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +@DisplayName("A RR socket provider") +public class RoundRobinSocketProviderImplTest extends AbstractSocketProviderTest { + + @Test + @DisplayName("initialized with a right addresses count") + public void testAddressesCount() { + RoundRobinSocketProviderImpl socketProvider + = new RoundRobinSocketProviderImpl("localhost:3301", "127.0.0.1:3302", "10.0.0.10:3303"); + assertEquals(3, socketProvider.getAddressCount()); + + socketProvider.refreshAddresses(Collections.singletonList("10.0.0.1")); + assertEquals(1, socketProvider.getAddressCount()); + } + + @Test + @DisplayName("initialized with a right addresses values") + public void testAddresses() { + String[] addresses = {"localhost:3301", "127.0.0.2:3302", "10.0.0.10:3303"}; + RoundRobinSocketProviderImpl socketProvider + = new RoundRobinSocketProviderImpl(addresses); + assertIterableEquals(Arrays.asList(addresses), asRawHostAndPort(socketProvider.getAddresses())); + + List strings = Collections.singletonList("10.0.0.1:3310"); + socketProvider.refreshAddresses(strings); + assertIterableEquals(strings, asRawHostAndPort(socketProvider.getAddresses())); + } + + @Test + @DisplayName("initialized failed when an empty addresses list is provided") + public void testEmptyAddresses() { + assertThrows(IllegalArgumentException.class, RoundRobinSocketProviderImpl::new); + } + + @Test + @DisplayName("changed addresses list with a failure when a new list is empty") + public void testResultWithEmptyAddresses() throws IOException { + RoundRobinSocketProviderImpl socketProvider + = wrapWithMockChannelProvider(new RoundRobinSocketProviderImpl("localhost:3301")); + + assertThrows(IllegalArgumentException.class, () -> socketProvider.refreshAddresses(Collections.emptyList())); + } + + @Test + @DisplayName("changed addresses list with a failure when a new list is null") + public void testResultWithWrongAddress() throws IOException { + RoundRobinSocketProviderImpl socketProvider + = wrapWithMockChannelProvider(new RoundRobinSocketProviderImpl("localhost:3301")); + + assertThrows(IllegalArgumentException.class, () -> socketProvider.refreshAddresses(null)); + } + + @Test + @DisplayName("initialized with a default timeout") + public void testDefaultTimeout() { + RoundRobinSocketProviderImpl socketProvider + = new RoundRobinSocketProviderImpl("localhost"); + assertEquals(RoundRobinSocketProviderImpl.NO_TIMEOUT, socketProvider.getTimeout()); + } + + @Test + @DisplayName("changed its timeout to new value") + public void testChangingTimeout() { + RoundRobinSocketProviderImpl socketProvider + = new RoundRobinSocketProviderImpl("localhost"); + int expectedTimeout = 10_000; + socketProvider.setTimeout(expectedTimeout); + assertEquals(expectedTimeout, socketProvider.getTimeout()); + } + + @Test + @DisplayName("changed to negative timeout with a failure") + public void testWrongChangingTimeout() { + RoundRobinSocketProviderImpl socketProvider + = new RoundRobinSocketProviderImpl("localhost"); + int negativeValue = -200; + assertThrows(IllegalArgumentException.class, () -> socketProvider.setTimeout(negativeValue)); + } + + @Test + @DisplayName("produced socket channels using a ring pool") + public void testAddressRingPool() throws IOException { + String[] addresses = {"localhost:3301", "10.0.0.1:3302", "10.0.0.2:3309"}; + RoundRobinSocketProviderImpl socketProvider + = wrapWithMockChannelProvider(new RoundRobinSocketProviderImpl(addresses)); + + for (int i = 0; i < 27; i++) { + socketProvider.get(0, null); + assertEquals(addresses[i % 3], extractRawHostAndPortString(socketProvider.getLastObtainedAddress())); + } + } + + @Test + @DisplayName("produced socket channels for the same instance") + public void testOneAddressPool() throws IOException { + String expectedAddress = "10.0.0.1:3301"; + String[] addresses = {expectedAddress}; + RoundRobinSocketProviderImpl socketProvider + = wrapWithMockChannelProvider(new RoundRobinSocketProviderImpl(addresses)); + + for (int i = 0; i < 5; i++) { + socketProvider.get(0, null); + assertEquals(expectedAddress, extractRawHostAndPortString(socketProvider.getLastObtainedAddress())); + } + } + + @Test + @DisplayName("produced socket channel with an exception when an attempt number is over") + public void testTooManyAttempts() throws IOException { + String expectedAddress = "10.0.0.1:3301"; + String[] addresses = {expectedAddress}; + RoundRobinSocketProviderImpl socketProvider + = wrapWithMockChannelProvider(new RoundRobinSocketProviderImpl(addresses)); + + int retriesLimit = 5; + socketProvider.setRetriesLimit(retriesLimit); + + for (int i = 0; i < retriesLimit; i++) { + socketProvider.get(0, null); + assertEquals(expectedAddress, extractRawHostAndPortString(socketProvider.getLastObtainedAddress())); + } + + assertThrows(CommunicationException.class, () -> socketProvider.get(retriesLimit, null)); + } + + @Test + @DisplayName("produced a socket channel with a failure when an unreachable address is provided") + public void testWrongAddress() throws IOException { + RoundRobinSocketProviderImpl socketProvider + = wrapWithMockErroredChannelProvider(new RoundRobinSocketProviderImpl("unreachable-host:3301")); + assertThrows(CommunicationException.class, () -> socketProvider.get(0, null)); + } + + @Test + @DisplayName("produced a socket channel with a failure with an unreachable address after refresh") + public void testWrongRefreshAddress() throws IOException { + RoundRobinSocketProviderImpl socketProvider + = wrapWithMockErroredChannelProvider(new RoundRobinSocketProviderImpl("unreachable-host:3301")); + assertThrows(CommunicationException.class, () -> socketProvider.get(0, null)); + } + +} diff --git a/src/test/java/org/tarantool/SingleSocketChannelProviderImplTest.java b/src/test/java/org/tarantool/SingleSocketChannelProviderImplTest.java new file mode 100644 index 00000000..a7a53fb4 --- /dev/null +++ b/src/test/java/org/tarantool/SingleSocketChannelProviderImplTest.java @@ -0,0 +1,69 @@ +package org.tarantool; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +@DisplayName("A single socket provider") +class SingleSocketChannelProviderImplTest extends AbstractSocketProviderTest { + + @Test + @DisplayName("initialized with a right address") + public void testAddressesCount() { + String expectedAddress = "localhost:3301"; + SingleSocketChannelProviderImpl socketProvider + = new SingleSocketChannelProviderImpl(expectedAddress); + assertEquals(expectedAddress, extractRawHostAndPortString(socketProvider.getAddress())); + } + + @Test + @DisplayName("poorly initialized with an empty address") + public void testEmptyAddresses() { + assertThrows(IllegalArgumentException.class, () -> new SingleSocketChannelProviderImpl(null)); + } + + @Test + @DisplayName("initialized with a default timeout") + public void testDefaultTimeout() { + RoundRobinSocketProviderImpl socketProvider + = new RoundRobinSocketProviderImpl("localhost"); + assertEquals(RoundRobinSocketProviderImpl.NO_TIMEOUT, socketProvider.getTimeout()); + } + + @Test + @DisplayName("changed its timeout to new value") + public void testChangingTimeout() { + RoundRobinSocketProviderImpl socketProvider + = new RoundRobinSocketProviderImpl("localhost"); + int expectedTimeout = 10_000; + socketProvider.setTimeout(expectedTimeout); + assertEquals(expectedTimeout, socketProvider.getTimeout()); + } + + @Test + @DisplayName("changed to negative timeout with a failure") + public void testWrongChangingTimeout() { + RoundRobinSocketProviderImpl socketProvider + = new RoundRobinSocketProviderImpl("localhost"); + int negativeValue = -100; + assertThrows(IllegalArgumentException.class, () -> socketProvider.setTimeout(negativeValue)); + } + + @Test + @DisplayName("produced sockets with same address") + public void testMultipleChannelGetting() throws IOException { + String expectedAddresss = "localhost:3301"; + SingleSocketChannelProviderImpl socketProvider + = wrapWithMockChannelProvider(new SingleSocketChannelProviderImpl(expectedAddresss)); + + for (int i = 0; i < 10; i++) { + socketProvider.get(0, null); + assertEquals(expectedAddresss, extractRawHostAndPortString(socketProvider.getAddress())); + } + } + +} diff --git a/src/test/java/org/tarantool/TestUtils.java b/src/test/java/org/tarantool/TestUtils.java index dfa94af4..873c197b 100644 --- a/src/test/java/org/tarantool/TestUtils.java +++ b/src/test/java/org/tarantool/TestUtils.java @@ -1,11 +1,28 @@ package org.tarantool; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class TestUtils { + public static String makeDiscoveryFunction(String functionName, Collection addresses) { + String functionResult = addresses.stream() + .map(address -> "'" + address + "'") + .collect(Collectors.joining(",", "{", "}")); + return makeDiscoveryFunction(functionName, functionResult); + } + + public static String makeDiscoveryFunction(String functionName, Object result) { + return makeDiscoveryFunction(functionName, result.toString()); + } + + public static String makeDiscoveryFunction(String functionName, String body) { + return "function " + functionName + "() return " + body + " end"; + } + static final String replicationInfoRequest = "return " + "box.info.id, " + "box.info.lsn, " + @@ -13,7 +30,9 @@ public class TestUtils { @FunctionalInterface public interface ThrowingAction { + void run() throws X; + } public static Runnable throwingWrapper(ThrowingAction action) { @@ -175,4 +194,5 @@ private static T ensureType(Class cls, Object v) { } return cls.cast(v); } + } diff --git a/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java b/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java new file mode 100644 index 00000000..f0527146 --- /dev/null +++ b/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java @@ -0,0 +1,196 @@ +package org.tarantool.cluster; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.tarantool.TestUtils.makeDiscoveryFunction; +import static org.tarantool.TestUtils.makeInstanceEnv; + +import org.tarantool.AbstractTarantoolConnectorIT; +import org.tarantool.CommunicationException; +import org.tarantool.TarantoolClient; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolClusterClientConfig; +import org.tarantool.TarantoolControl; +import org.tarantool.TarantoolException; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +@DisplayName("A cluster discoverer") +public class ClusterServiceStoredFunctionDiscovererIT { + + protected static final int INSTANCE_LISTEN_PORT = 3301; + protected static final int INSTANCE_ADMIN_PORT = 3313; + private static final String LUA_FILE = "jdk-testing.lua"; + + private static final String INSTANCE_NAME = "jdk-testing"; + private static TarantoolControl control; + + private static String ENTRY_FUNCTION_NAME = "getAddresses"; + + private TarantoolClusterClientConfig clusterConfig; + private TarantoolClient client; + + @BeforeAll + public static void setupEnv() { + control = new TarantoolControl(); + control.createInstance(INSTANCE_NAME, LUA_FILE, makeInstanceEnv(INSTANCE_LISTEN_PORT, INSTANCE_ADMIN_PORT)); + + control.start(INSTANCE_NAME); + control.waitStarted(INSTANCE_NAME); + } + + @BeforeEach + public void setupTest() { + clusterConfig = AbstractTarantoolConnectorIT.makeClusterClientConfig(); + clusterConfig.clusterDiscoveryEntryFunction = ENTRY_FUNCTION_NAME; + + client = new TarantoolClientImpl("localhost:" + INSTANCE_LISTEN_PORT, clusterConfig); + } + + @AfterAll + public static void tearDownEnv() { + control.stop(INSTANCE_NAME); + control.waitStopped(INSTANCE_NAME); + } + + @Test + @DisplayName("fetched list of addresses") + public void testSuccessfulAddressParsing() { + List addresses = Arrays.asList("localhost:3311", "127.0.0.1:3301"); + String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, addresses); + control.openConsole(INSTANCE_NAME).exec(functionCode); + + TarantoolClusterStoredFunctionDiscoverer discoverer = + new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client); + + Set instances = discoverer.getInstances(); + + assertNotNull(instances); + assertEquals(2, instances.size()); + assertTrue(instances.contains(addresses.get(0))); + assertTrue(instances.contains(addresses.get(1))); + } + + @Test + @DisplayName("fetched duplicated addresses") + public void testSuccessfulUniqueAddressParsing() { + List addresses = Arrays.asList("localhost:3311", "127.0.0.1:3301", "127.0.0.2:3301", "localhost:3311"); + + String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, addresses); + control.openConsole(INSTANCE_NAME).exec(functionCode); + + TarantoolClusterStoredFunctionDiscoverer discoverer = + new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client); + + Set instances = discoverer.getInstances(); + + assertNotNull(instances); + assertEquals(3, instances.size()); + assertTrue(instances.contains(addresses.get(0))); + assertTrue(instances.contains(addresses.get(1))); + assertTrue(instances.contains(addresses.get(3))); + } + + + @Test + @DisplayName("fetched empty address list") + public void testFunctionReturnedEmptyList() { + String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, Collections.emptyList()); + control.openConsole(INSTANCE_NAME).exec(functionCode); + + TarantoolClusterStoredFunctionDiscoverer discoverer = + new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client); + + Set instances = discoverer.getInstances(); + + assertNotNull(instances); + assertTrue(instances.isEmpty()); + } + + @Test + @DisplayName("fetched with an exception using wrong function name") + public void testWrongFunctionName() { + clusterConfig.clusterDiscoveryEntryFunction = "wrongFunction"; + + TarantoolClusterStoredFunctionDiscoverer discoverer = + new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client); + + assertThrows(TarantoolException.class, discoverer::getInstances); + } + + @Test + @DisplayName("fetched with an exception using a broken client") + public void testWrongInstanceAddress() { + clusterConfig.initTimeoutMillis = 1000; + + client.close(); + TarantoolClusterStoredFunctionDiscoverer discoverer = + new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client); + + assertThrows(CommunicationException.class, discoverer::getInstances); + } + + @Test + @DisplayName("fetched with an exception when wrong data type returned") + public void testWrongTypeResultData() { + String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, 42); + control.openConsole(INSTANCE_NAME).exec(functionCode); + + TarantoolClusterStoredFunctionDiscoverer discoverer = + new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client); + + assertThrows(IllegalDiscoveryFunctionResult.class, discoverer::getInstances); + } + + @Test + @DisplayName("fetched with an exception using no return function") + public void testFunctionWithNoReturn() { + String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, ""); + control.openConsole(INSTANCE_NAME).exec(functionCode); + + TarantoolClusterStoredFunctionDiscoverer discoverer = + new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client); + + assertThrows(IllegalDiscoveryFunctionResult.class, discoverer::getInstances); + } + + @Test + @DisplayName("fetched first result as a list and ignored other multi-results") + public void testWrongMultiResultData() { + String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, "{'host1'}, 'host2', 423"); + control.openConsole(INSTANCE_NAME).exec(functionCode); + + TarantoolClusterStoredFunctionDiscoverer discoverer = + new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client); + + Set instances = discoverer.getInstances(); + + assertNotNull(instances); + assertEquals(1, instances.size()); + assertTrue(instances.contains("host1")); + } + + @Test + @DisplayName("fetched with an exception using error-prone function") + public void testFunctionWithError() { + String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, "error('msg')"); + control.openConsole(INSTANCE_NAME).exec(functionCode); + + TarantoolClusterStoredFunctionDiscoverer discoverer = + new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client); + + assertThrows(TarantoolException.class, discoverer::getInstances); + } + +}