Skip to content

Commit fe6bdbb

Browse files
committed
Apply connection timeout for each connect attempt
Relax a socket provider contract. Now socket provider can throw a transient error and the client will try to obtain a socket again instead of being closed. Make built-in socket providers configurable. Now the client can set retries count and connection timeout for providers. Update README doc im scope of new socket provider contract. Closes: #167 Follows on: #144
1 parent 3420575 commit fe6bdbb

12 files changed

+246
-145
lines changed

README.md

Lines changed: 62 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ To get the Java connector for Tarantool 1.6.9, visit
2222

2323
## Getting started
2424

25-
1. Add a dependency to your `pom.xml` file.
25+
1. Add a dependency to your `pom.xml` file:
2626

2727
```xml
2828
<dependency>
@@ -32,75 +32,81 @@ To get the Java connector for Tarantool 1.6.9, visit
3232
</dependency>
3333
```
3434

35-
2. Configure `TarantoolClientConfig`.
35+
2. Configure `TarantoolClientConfig`:
3636

3737
```java
3838
TarantoolClientConfig config = new TarantoolClientConfig();
3939
config.username = "test";
4040
config.password = "test";
4141
```
4242

43-
3. Implement your `SocketChannelProvider`.
44-
It should return a connected `SocketChannel`.
43+
3. Create a client:
4544

4645
```java
47-
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
48-
@Override
49-
public SocketChannel get(int retryNumber, Throwable lastError) {
50-
if (lastError != null) {
51-
lastError.printStackTrace(System.out);
52-
}
53-
try {
54-
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
55-
} catch (IOException e) {
56-
throw new IllegalStateException(e);
57-
}
58-
}
59-
};
46+
TarantoolClient client = new TarantoolClientImpl("host:3301", config);
6047
```
6148

62-
Here you could also implement some reconnection or fallback policy.
63-
Remember that `TarantoolClient` adopts a
64-
[fail-fast](https://en.wikipedia.org/wiki/Fail-fast) policy
65-
when a client is not connected.
49+
using `TarantoolClientImpl(String, TarantoolClientConfig)` is equivalent to:
6650

67-
The `TarantoolClient` will stop functioning if your implementation of a socket
68-
channel provider raises an exception or returns a null. You will need a new
69-
instance of client to recover. Hence, you should only throw in case you have
70-
met unrecoverable error.
51+
```java
52+
SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("host:3301")
53+
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
54+
```
7155

72-
Below is an example of `SocketChannelProvider` implementation that handles short
73-
tarantool restarts.
56+
You could implement your own `SocketChannelProvider`. It should return
57+
a connected `SocketChannel`. Feel free to implement `get(int retryNumber, Throwable lastError)`
58+
using your appropriate strategy to obtain the channel. The strategy can take into
59+
account current attempt number (retryNumber) and the last transient error occurred on
60+
the previous attempt.
61+
62+
The `TarantoolClient` will be closed if your implementation of a socket
63+
channel provider raises any exceptions but not a `SocketProviderTransientException`.
64+
Latter is handled by the client as a recoverable error. Otherwise, you will
65+
need a new instance of client to recover. Hence, you should only throw an
66+
error different to `SocketProviderTransientException` in case you have met
67+
unrecoverable error.
68+
69+
Below is an example of `SocketChannelProvider` implementation that tries
70+
to connect no more than 3 times, two seconds for each attempt at max.
7471

7572
```java
7673
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
7774
@Override
7875
public SocketChannel get(int retryNumber, Throwable lastError) {
79-
long deadline = System.currentTimeMillis() + RESTART_TIMEOUT;
80-
while (!Thread.currentThread().isInterrupted()) {
81-
try {
82-
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
83-
} catch (IOException e) {
84-
if (deadline < System.currentTimeMillis())
85-
throw new RuntimeException(e);
86-
try {
87-
Thread.sleep(100);
88-
} catch (InterruptedException ignored) {
89-
Thread.currentThread().interrupt();
90-
}
76+
if (retryNumber > 3) {
77+
throw new RuntimeException("Too many attempts");
78+
}
79+
SocketChannel channel = null;
80+
try {
81+
channel = SocketChannel.open();
82+
channel.socket().connect(new InetSocketAddress("localhost", 3301), 2000);
83+
return channel;
84+
} catch (IOException e) {
85+
if (channel != null) {
86+
try {
87+
channel.close();
88+
} catch (IOException ignored) { }
9189
}
90+
throw new SocketProviderTransientException("Couldn't connect to server", e);
9291
}
93-
throw new RuntimeException(new TimeoutException("Connect timed out."));
9492
}
9593
};
9694
```
9795

98-
4. Create a client.
96+
Same behaviour can be achieved using built-in `SingleSocketChannelProviderImpl`:
9997

10098
```java
99+
TarantoolClientConfig config = new TarantoolClientConfig();
100+
config.connectionTimeout = 2_000; // two seconds timeout per attempt
101+
config.retryCount = 3; // three attempts at max
102+
103+
SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("localhost:3301")
101104
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
102105
```
103106

107+
`SingleSocketChannelProviderImpl` implements `ConfigurableSocketChannelProvider` that
108+
makes possible for the client to configure a socket provider.
109+
104110
> **Notes:**
105111
> * `TarantoolClient` is thread-safe and asynchronous, so you should use one
106112
> client inside the whole application.
@@ -168,6 +174,20 @@ a list of nodes which will be used by the cluster client to provide such
168174
ability. Also you can prefer to use a [discovery mechanism](#auto-discovery)
169175
in order to dynamically fetch and apply the node list.
170176

177+
### The RoundRobinSocketProviderImpl class
178+
179+
This cluster-aware provider uses addresses pool to connect to DB server.
180+
The provider picks up next address in order the addresses were passed.
181+
182+
Similar to `SingleSocketChannelProviderImpl` this RR provider also
183+
relies on two options from the config: `TarantoolClientConfig.connectionTimeout`
184+
and `TarantoolClientConfig.retryCount` but in a bit different way.
185+
The latter option says how many times the provider should try to establish a
186+
connection to _one instance_ before failing an attempt. The provider requires
187+
positive retry count to work properly. The socket timeout is used to limit
188+
an interval between connections attempts per instance. In other words, the provider
189+
follows a pattern _connection must succeed after N attempts with M interval between them_.
190+
171191
### Basic cluster client usage
172192

173193
1. Configure `TarantoolClusterClientConfig`:
@@ -198,7 +218,7 @@ client.syncOps().insert(23, Arrays.asList(1, 1));
198218
Auto-discovery feature allows a cluster client to fetch addresses of
199219
cluster nodes to reflect changes related to the cluster topology. To achieve
200220
this you have to create a Lua function on the server side which returns
201-
a single array result. Client periodically pools the server to obtain a
221+
a single array result. Client periodically polls the server to obtain a
202222
fresh list and apply it if its content changes.
203223

204224
1. On the server side create a function which returns nodes:

src/main/java/org/tarantool/BaseSocketChannelProvider.java

Lines changed: 29 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.net.InetSocketAddress;
55
import java.nio.channels.SocketChannel;
66

7-
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
7+
public abstract class BaseSocketChannelProvider implements ConfigurableSocketChannelProvider {
88

99
/**
1010
* Limit of retries.
@@ -14,46 +14,25 @@ public abstract class BaseSocketChannelProvider implements SocketChannelProvider
1414
/**
1515
* Timeout to establish socket connection with an individual server.
1616
*/
17-
private int timeout = NO_TIMEOUT;
17+
private int connectionTimeout = NO_TIMEOUT;
1818

1919
/**
2020
* Tries to establish a new connection to the Tarantool instances.
2121
*
22-
* @param retryNumber number of current retry. Reset after successful connect.
22+
* @param retryNumber number of current retry
2323
* @param lastError the last error occurs when reconnecting
2424
*
2525
* @return connected socket channel
2626
*
27-
* @throws CommunicationException if any I/O errors happen or there are
28-
* no addresses available
27+
* @throws CommunicationException if number of retries or socket timeout are exceeded
28+
* @throws SocketProviderTransientException if any I/O errors happen
2929
*/
3030
@Override
3131
public final SocketChannel get(int retryNumber, Throwable lastError) {
32-
if (areRetriesExhausted(retryNumber)) {
33-
throw new CommunicationException("Connection retries exceeded.", lastError);
34-
}
35-
36-
long deadline = System.currentTimeMillis() + timeout;
37-
while (!Thread.currentThread().isInterrupted()) {
38-
try {
39-
InetSocketAddress address = getAddress(retryNumber, lastError);
40-
return openChannel(address);
41-
} catch (IOException e) {
42-
checkTimeout(deadline, e);
43-
}
44-
}
45-
throw new CommunicationException("Thread interrupted.", new InterruptedException());
46-
}
47-
48-
private void checkTimeout(long deadline, Exception e) {
49-
long timeLeft = deadline - System.currentTimeMillis();
50-
if (timeLeft <= 0) {
51-
throw new CommunicationException("Connection time out.", e);
52-
}
5332
try {
54-
Thread.sleep(timeLeft / 10);
55-
} catch (InterruptedException ignored) {
56-
Thread.currentThread().interrupt();
33+
return makeAttempt(retryNumber, lastError);
34+
} catch (IOException e) {
35+
throw new SocketProviderTransientException("Couldn't connect to the server", e);
5736
}
5837
}
5938

@@ -68,7 +47,7 @@ private void checkTimeout(long deadline, Exception e) {
6847
*
6948
* @throws IOException if any I/O errors occur
7049
*/
71-
protected abstract InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException;
50+
protected abstract SocketChannel makeAttempt(int retryNumber, Throwable lastError) throws IOException;
7251

7352
/**
7453
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
@@ -79,7 +58,11 @@ private void checkTimeout(long deadline, Exception e) {
7958
*
8059
* @param retriesLimit Limit of retries to use.
8160
*/
61+
@Override
8262
public void setRetriesLimit(int retriesLimit) {
63+
if (retriesLimit < 0) {
64+
throw new IllegalArgumentException("Retries count cannot be negative.");
65+
}
8366
this.retriesLimit = retriesLimit;
8467
}
8568

@@ -111,7 +94,7 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
11194
SocketChannel channel = null;
11295
try {
11396
channel = SocketChannel.open();
114-
channel.socket().connect(socketAddress, timeout);
97+
channel.socket().connect(socketAddress, connectionTimeout);
11598
return channel;
11699
} catch (IOException e) {
117100
if (channel != null) {
@@ -126,44 +109,31 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
126109
}
127110

128111
/**
129-
* Sets maximum amount of time to wait for a socket connection establishment
130-
* with an individual server.
131-
* <p>
132-
* Zero means infinite timeout.
133-
*
134-
* @param timeout timeout value, ms.
135-
*
136-
* @throws IllegalArgumentException if timeout is negative.
137-
*/
138-
public void setTimeout(int timeout) {
139-
if (timeout < 0) {
140-
throw new IllegalArgumentException("timeout is negative.");
141-
}
142-
this.timeout = timeout;
143-
}
144-
145-
/**
146-
* Gest maximum amount of time to wait for a socket
112+
* Gets maximum amount of time to wait for a socket
147113
* connection establishment with an individual server.
148114
*
149115
* @return timeout
150116
*/
151-
public int getTimeout() {
152-
return timeout;
117+
public int getConnectionTimeout() {
118+
return connectionTimeout;
153119
}
154120

155121
/**
156-
* Provides a decision on whether retries limit is hit.
122+
* Sets maximum amount of time to wait for a socket connection establishment
123+
* with an individual server.
124+
* <p>
125+
* Zero means infinite connectionTimeout.
157126
*
158-
* @param retries Current count of retries.
127+
* @param connectionTimeout connectionTimeout value, ms.
159128
*
160-
* @return {@code true} if retries are exhausted.
129+
* @throws IllegalArgumentException if connectionTimeout is negative.
161130
*/
162-
private boolean areRetriesExhausted(int retries) {
163-
int limit = getRetriesLimit();
164-
if (limit < 0) {
165-
return false;
131+
@Override
132+
public void setConnectionTimeout(int connectionTimeout) {
133+
if (connectionTimeout < 0) {
134+
throw new IllegalArgumentException("Connection timeout cannot be negative.");
166135
}
167-
return retries >= limit;
136+
this.connectionTimeout = connectionTimeout;
168137
}
138+
169139
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.tarantool;
2+
3+
public interface ConfigurableSocketChannelProvider extends SocketChannelProvider {
4+
5+
int RETRY_NO_LIMIT = 0;
6+
int NO_TIMEOUT = 0;
7+
8+
/**
9+
* Configures max count of retries.
10+
*
11+
* @param limit max attempts count
12+
*/
13+
void setRetriesLimit(int limit);
14+
15+
/**
16+
* Configures max time to establish
17+
* a connection per attempt.
18+
*
19+
* @param timeout connection timeout in millis
20+
*/
21+
void setConnectionTimeout(int timeout);
22+
23+
}

src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.IOException;
44
import java.net.InetSocketAddress;
55
import java.net.SocketAddress;
6+
import java.nio.channels.SocketChannel;
67
import java.util.ArrayList;
78
import java.util.Arrays;
89
import java.util.Collection;
@@ -21,6 +22,7 @@
2122
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider implements RefreshableSocketProvider {
2223

2324
private static final int UNSET_POSITION = -1;
25+
private static final int DEFAULT_RETRIES_PER_CONNECTION = 3;
2426

2527
/**
2628
* Socket addresses pool.
@@ -59,6 +61,7 @@ public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider impl
5961
*/
6062
public RoundRobinSocketProviderImpl(String... addresses) {
6163
updateAddressList(Arrays.asList(addresses));
64+
setRetriesLimit(DEFAULT_RETRIES_PER_CONNECTION);
6265
}
6366

6467
private void updateAddressList(Collection<String> addresses) {
@@ -117,8 +120,26 @@ protected InetSocketAddress getLastObtainedAddress() {
117120
}
118121

119122
@Override
120-
protected InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException {
121-
return getNextSocketAddress();
123+
protected SocketChannel makeAttempt(int retryNumber, Throwable lastError) throws IOException {
124+
if (retryNumber > getAddressCount()) {
125+
throwFatalError("No more connection addresses are left.");
126+
}
127+
128+
int retriesLimit = getRetriesLimit();
129+
InetSocketAddress socketAddress = getNextSocketAddress();
130+
if (retriesLimit < 1) {
131+
throwFatalError("Retries count should be at least 1 or more");
132+
}
133+
134+
IOException connectionError = null;
135+
for (int i = 0; i < retriesLimit; i++) {
136+
try {
137+
return openChannel(socketAddress);
138+
} catch (IOException e) {
139+
connectionError = e;
140+
}
141+
}
142+
throw connectionError;
122143
}
123144

124145
/**
@@ -163,4 +184,8 @@ public void refreshAddresses(Collection<String> addresses) {
163184
updateAddressList(addresses);
164185
}
165186

187+
private void throwFatalError(String message) {
188+
throw new CommunicationException(message);
189+
}
190+
166191
}

0 commit comments

Comments
 (0)