diff --git a/src/main/java/com/arangodb/ArangoDB.java b/src/main/java/com/arangodb/ArangoDB.java index 403ba3a26..55859c819 100644 --- a/src/main/java/com/arangodb/ArangoDB.java +++ b/src/main/java/com/arangodb/ArangoDB.java @@ -20,14 +20,25 @@ package com.arangodb; -import com.arangodb.entity.*; +import com.arangodb.entity.ArangoDBEngine; +import com.arangodb.entity.ArangoDBVersion; +import com.arangodb.entity.LoadBalancingStrategy; +import com.arangodb.entity.LogEntity; +import com.arangodb.entity.LogLevelEntity; +import com.arangodb.entity.Permissions; +import com.arangodb.entity.ServerRole; +import com.arangodb.entity.UserEntity; import com.arangodb.internal.ArangoContext; import com.arangodb.internal.ArangoDBImpl; import com.arangodb.internal.ArangoDefaults; import com.arangodb.internal.InternalArangoDBBuilder; import com.arangodb.internal.http.HttpCommunication; import com.arangodb.internal.http.HttpConnectionFactory; -import com.arangodb.internal.net.*; +import com.arangodb.internal.net.ConnectionFactory; +import com.arangodb.internal.net.Host; +import com.arangodb.internal.net.HostHandle; +import com.arangodb.internal.net.HostHandler; +import com.arangodb.internal.net.HostResolver; import com.arangodb.internal.util.ArangoDeserializerImpl; import com.arangodb.internal.util.ArangoSerializationFactory; import com.arangodb.internal.util.ArangoSerializerImpl; @@ -42,7 +53,18 @@ import com.arangodb.util.ArangoDeserializer; import com.arangodb.util.ArangoSerialization; import com.arangodb.util.ArangoSerializer; -import com.arangodb.velocypack.*; +import com.arangodb.velocypack.VPack; +import com.arangodb.velocypack.VPackAnnotationFieldFilter; +import com.arangodb.velocypack.VPackAnnotationFieldNaming; +import com.arangodb.velocypack.VPackDeserializer; +import com.arangodb.velocypack.VPackInstanceCreator; +import com.arangodb.velocypack.VPackJsonDeserializer; +import com.arangodb.velocypack.VPackJsonSerializer; +import com.arangodb.velocypack.VPackModule; +import com.arangodb.velocypack.VPackParser; +import com.arangodb.velocypack.VPackParserModule; +import com.arangodb.velocypack.VPackSerializer; +import com.arangodb.velocypack.ValueType; import com.arangodb.velocystream.Request; import com.arangodb.velocystream.Response; import org.apache.http.client.HttpRequestRetryHandler; @@ -248,6 +270,19 @@ public Builder connectionTtl(final Long connectionTtl) { return this; } + /** + * Set the keep-alive interval for VST connections. If set, every VST connection will perform a no-op request every + * {@code keepAliveInterval} seconds, to avoid to be closed due to inactivity by the server (or by the external + * environment, eg. firewall, intermediate routers, operating system). + * + * @param keepAliveInterval interval in seconds + * @return {@link ArangoDB.Builder} + */ + public Builder keepAliveInterval(final Integer keepAliveInterval) { + setKeepAliveInterval(keepAliveInterval); + return this; + } + /** * Whether or not the driver should acquire a list of available coordinators in an ArangoDB cluster or a single * server with active failover. @@ -600,7 +635,7 @@ public synchronized ArangoDB build() { final int max = maxConnections != null ? Math.max(1, maxConnections) : protocolMaxConnections; final ConnectionFactory connectionFactory = (protocol == null || Protocol.VST == protocol) - ? new VstConnectionFactorySync(host, timeout, connectionTtl, useSsl, sslContext) + ? new VstConnectionFactorySync(host, timeout, connectionTtl, keepAliveInterval, useSsl, sslContext) : new HttpConnectionFactory(timeout, user, password, useSsl, sslContext, hostnameVerifier, custom, protocol, connectionTtl, httpCookieSpec, httpRequestRetryHandler); diff --git a/src/main/java/com/arangodb/async/ArangoDBAsync.java b/src/main/java/com/arangodb/async/ArangoDBAsync.java index 36f8e74d1..b86c66ec3 100644 --- a/src/main/java/com/arangodb/async/ArangoDBAsync.java +++ b/src/main/java/com/arangodb/async/ArangoDBAsync.java @@ -26,7 +26,13 @@ import com.arangodb.async.internal.ArangoDBAsyncImpl; import com.arangodb.async.internal.velocystream.VstCommunicationAsync; import com.arangodb.async.internal.velocystream.VstConnectionFactoryAsync; -import com.arangodb.entity.*; +import com.arangodb.entity.ArangoDBVersion; +import com.arangodb.entity.LoadBalancingStrategy; +import com.arangodb.entity.LogEntity; +import com.arangodb.entity.LogLevelEntity; +import com.arangodb.entity.Permissions; +import com.arangodb.entity.ServerRole; +import com.arangodb.entity.UserEntity; import com.arangodb.internal.ArangoContext; import com.arangodb.internal.ArangoDefaults; import com.arangodb.internal.InternalArangoDBBuilder; @@ -45,7 +51,18 @@ import com.arangodb.util.ArangoDeserializer; import com.arangodb.util.ArangoSerialization; import com.arangodb.util.ArangoSerializer; -import com.arangodb.velocypack.*; +import com.arangodb.velocypack.VPack; +import com.arangodb.velocypack.VPackAnnotationFieldFilter; +import com.arangodb.velocypack.VPackAnnotationFieldNaming; +import com.arangodb.velocypack.VPackDeserializer; +import com.arangodb.velocypack.VPackInstanceCreator; +import com.arangodb.velocypack.VPackJsonDeserializer; +import com.arangodb.velocypack.VPackJsonSerializer; +import com.arangodb.velocypack.VPackModule; +import com.arangodb.velocypack.VPackParser; +import com.arangodb.velocypack.VPackParserModule; +import com.arangodb.velocypack.VPackSerializer; +import com.arangodb.velocypack.ValueType; import com.arangodb.velocystream.Request; import com.arangodb.velocystream.Response; @@ -408,6 +425,19 @@ public Builder connectionTtl(final Long connectionTtl) { return this; } + /** + * Set the keep-alive interval for VST connections. If set, every VST connection will perform a no-op request every + * {@code keepAliveInterval} seconds, to avoid to be closed due to inactivity by the server (or by the external + * environment, eg. firewall, intermediate routers, operating system). + * + * @param keepAliveInterval interval in seconds + * @return {@link ArangoDBAsync.Builder} + */ + public Builder keepAliveInterval(final Integer keepAliveInterval) { + setKeepAliveInterval(keepAliveInterval); + return this; + } + /** * Whether or not the driver should acquire a list of available coordinators in an ArangoDB cluster or a single * server with active failover. @@ -746,7 +776,7 @@ public synchronized ArangoDBAsync build() { final int max = maxConnections != null ? Math.max(1, maxConnections) : ArangoDefaults.MAX_CONNECTIONS_VST_DEFAULT; final ConnectionFactory connectionFactory = new VstConnectionFactoryAsync(host, timeout, connectionTtl, - useSsl, sslContext); + keepAliveInterval, useSsl, sslContext); final HostResolver hostResolver = createHostResolver(createHostList(max, connectionFactory), max, connectionFactory); final HostHandler hostHandler = createHostHandler(hostResolver); diff --git a/src/main/java/com/arangodb/async/internal/velocystream/VstConnectionAsync.java b/src/main/java/com/arangodb/async/internal/velocystream/VstConnectionAsync.java index 1dbcc313b..49e02bd1a 100644 --- a/src/main/java/com/arangodb/async/internal/velocystream/VstConnectionAsync.java +++ b/src/main/java/com/arangodb/async/internal/velocystream/VstConnectionAsync.java @@ -34,13 +34,14 @@ /** * @author Mark Vollmary */ -public class VstConnectionAsync extends VstConnection { +public class VstConnectionAsync extends VstConnection> { - private VstConnectionAsync(final HostDescription host, final Integer timeout, final Long ttl, final Boolean useSsl, - final SSLContext sslContext, final MessageStore messageStore) { - super(host, timeout, ttl, useSsl, sslContext, messageStore); + private VstConnectionAsync(final HostDescription host, final Integer timeout, final Long ttl, final Integer keepAliveInterval, + final Boolean useSsl, final SSLContext sslContext, final MessageStore messageStore) { + super(host, timeout, ttl, keepAliveInterval, useSsl, sslContext, messageStore); } + @Override public synchronized CompletableFuture write(final Message message, final Collection chunks) { final CompletableFuture future = new CompletableFuture<>(); final FutureTask task = new FutureTask<>(() -> { @@ -56,12 +57,18 @@ public synchronized CompletableFuture write(final Message message, fina return future; } + @Override + protected void doKeepAlive() { + sendKeepAlive().join(); + } + public static class Builder { private MessageStore messageStore; private HostDescription host; private Integer timeout; private Long ttl; + private Integer keepAliveInterval; private Boolean useSsl; private SSLContext sslContext; @@ -89,6 +96,11 @@ public Builder ttl(final Long ttl) { return this; } + public Builder keepAliveInterval(final Integer keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + return this; + } + public Builder useSsl(final Boolean useSsl) { this.useSsl = useSsl; return this; @@ -100,7 +112,7 @@ public Builder sslContext(final SSLContext sslContext) { } public VstConnectionAsync build() { - return new VstConnectionAsync(host, timeout, ttl, useSsl, sslContext, messageStore); + return new VstConnectionAsync(host, timeout, ttl, keepAliveInterval, useSsl, sslContext, messageStore); } } diff --git a/src/main/java/com/arangodb/async/internal/velocystream/VstConnectionFactoryAsync.java b/src/main/java/com/arangodb/async/internal/velocystream/VstConnectionFactoryAsync.java index 9a181c5a9..bd2380feb 100644 --- a/src/main/java/com/arangodb/async/internal/velocystream/VstConnectionFactoryAsync.java +++ b/src/main/java/com/arangodb/async/internal/velocystream/VstConnectionFactoryAsync.java @@ -35,9 +35,10 @@ public class VstConnectionFactoryAsync implements ConnectionFactory { private final VstConnectionAsync.Builder builder; public VstConnectionFactoryAsync(final HostDescription host, final Integer timeout, final Long connectionTtl, - final Boolean useSsl, final SSLContext sslContext) { + final Integer keepAliveInterval, final Boolean useSsl, final SSLContext sslContext) { super(); - builder = new VstConnectionAsync.Builder().timeout(timeout).ttl(connectionTtl).useSsl(useSsl) + builder = new VstConnectionAsync.Builder().timeout(timeout).ttl(connectionTtl) + .keepAliveInterval(keepAliveInterval).useSsl(useSsl) .sslContext(sslContext); } diff --git a/src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java b/src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java index 41aaae257..8a06f6564 100644 --- a/src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java +++ b/src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java @@ -23,7 +23,18 @@ import com.arangodb.ArangoDB; import com.arangodb.ArangoDBException; import com.arangodb.entity.LoadBalancingStrategy; -import com.arangodb.internal.net.*; +import com.arangodb.internal.net.Connection; +import com.arangodb.internal.net.ConnectionFactory; +import com.arangodb.internal.net.DirtyReadHostHandler; +import com.arangodb.internal.net.ExtendedHostResolver; +import com.arangodb.internal.net.FallbackHostHandler; +import com.arangodb.internal.net.Host; +import com.arangodb.internal.net.HostDescription; +import com.arangodb.internal.net.HostHandler; +import com.arangodb.internal.net.HostResolver; +import com.arangodb.internal.net.RandomHostHandler; +import com.arangodb.internal.net.RoundRobinHostHandler; +import com.arangodb.internal.net.SimpleHostResolver; import com.arangodb.internal.util.HostUtils; import com.arangodb.internal.velocypack.VPackDriverModule; import com.arangodb.util.ArangoDeserializer; @@ -63,6 +74,7 @@ public abstract class InternalArangoDBBuilder { private static final String PROPERTY_KEY_V_STREAM_CHUNK_CONTENT_SIZE = "arangodb.chunksize"; private static final String PROPERTY_KEY_MAX_CONNECTIONS = "arangodb.connections.max"; private static final String PROPERTY_KEY_CONNECTION_TTL = "arangodb.connections.ttl"; + private static final String PROPERTY_KEEP_ALIVE_INTERVAL = "arangodb.connections.keepAlive.interval"; private static final String PROPERTY_KEY_ACQUIRE_HOST_LIST = "arangodb.acquireHostList"; private static final String PROPERTY_KEY_ACQUIRE_HOST_LIST_INTERVAL = "arangodb.acquireHostList.interval"; private static final String PROPERTY_KEY_LOAD_BALANCING_STRATEGY = "arangodb.loadBalancingStrategy"; @@ -81,6 +93,7 @@ public abstract class InternalArangoDBBuilder { protected Integer chunksize; protected Integer maxConnections; protected Long connectionTtl; + protected Integer keepAliveInterval; protected final VPack.Builder vpackBuilder; protected final VPackParser.Builder vpackParserBuilder; protected ArangoSerializer serializer; @@ -135,6 +148,7 @@ protected void loadProperties(final Properties properties) { chunksize = loadChunkSize(properties, chunksize); maxConnections = loadMaxConnections(properties, maxConnections); connectionTtl = loadConnectionTtl(properties, connectionTtl); + keepAliveInterval = loadKeepAliveInterval(properties, keepAliveInterval); acquireHostList = loadAcquireHostList(properties, acquireHostList); acquireHostListInterval = loadAcquireHostListInterval(properties, acquireHostListInterval); loadBalancingStrategy = loadLoadBalancingStrategy(properties, loadBalancingStrategy); @@ -184,6 +198,10 @@ protected void setConnectionTtl(final Long connectionTtl) { this.connectionTtl = connectionTtl; } + protected void setKeepAliveInterval(final Integer keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + } + protected void setAcquireHostList(final Boolean acquireHostList) { this.acquireHostList = acquireHostList; } @@ -315,6 +333,12 @@ private static Long loadConnectionTtl(final Properties properties, final Long cu return ttl != null ? Long.parseLong(ttl) : null; } + private static Integer loadKeepAliveInterval(final Properties properties, final Integer currentValue) { + final String keepAliveInterval = getProperty(properties, PROPERTY_KEEP_ALIVE_INTERVAL, currentValue, + null); + return keepAliveInterval != null ? Integer.parseInt(keepAliveInterval) : null; + } + private static Boolean loadAcquireHostList(final Properties properties, final Boolean currentValue) { return Boolean.parseBoolean(getProperty(properties, PROPERTY_KEY_ACQUIRE_HOST_LIST, currentValue, ArangoDefaults.DEFAULT_ACQUIRE_HOST_LIST)); diff --git a/src/main/java/com/arangodb/internal/velocystream/VstConnectionFactorySync.java b/src/main/java/com/arangodb/internal/velocystream/VstConnectionFactorySync.java index 2f6a279da..df88bcc25 100644 --- a/src/main/java/com/arangodb/internal/velocystream/VstConnectionFactorySync.java +++ b/src/main/java/com/arangodb/internal/velocystream/VstConnectionFactorySync.java @@ -36,9 +36,10 @@ public class VstConnectionFactorySync implements ConnectionFactory { private final VstConnectionSync.Builder builder; public VstConnectionFactorySync(final HostDescription host, final Integer timeout, final Long connectionTtl, - final Boolean useSsl, final SSLContext sslContext) { + final Integer keepAliveInterval, final Boolean useSsl, final SSLContext sslContext) { super(); - builder = new VstConnectionSync.Builder().timeout(timeout).ttl(connectionTtl).useSsl(useSsl) + builder = new VstConnectionSync.Builder().timeout(timeout).ttl(connectionTtl) + .keepAliveInterval(keepAliveInterval).useSsl(useSsl) .sslContext(sslContext); } diff --git a/src/main/java/com/arangodb/internal/velocystream/internal/VstConnection.java b/src/main/java/com/arangodb/internal/velocystream/internal/VstConnection.java index 6e9ed478c..712dbf5f8 100644 --- a/src/main/java/com/arangodb/internal/velocystream/internal/VstConnection.java +++ b/src/main/java/com/arangodb/internal/velocystream/internal/VstConnection.java @@ -24,7 +24,9 @@ import com.arangodb.internal.ArangoDefaults; import com.arangodb.internal.net.Connection; import com.arangodb.internal.net.HostDescription; +import com.arangodb.velocypack.VPackBuilder; import com.arangodb.velocypack.VPackSlice; +import com.arangodb.velocypack.ValueType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,25 +43,35 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * @author Mark Vollmary */ -public abstract class VstConnection implements Connection { +public abstract class VstConnection implements Connection { private static final Logger LOGGER = LoggerFactory.getLogger(VstConnection.class); private static final byte[] PROTOCOL_HEADER = "VST/1.0\r\n\r\n".getBytes(); private ExecutorService executor; + + private ScheduledExecutorService keepAliveScheduler; + private final AtomicLong keepAliveId = new AtomicLong(); + protected final MessageStore messageStore; protected final Integer timeout; private final Long ttl; + + private final Integer keepAliveInterval; private final Boolean useSsl; private final SSLContext sslContext; @@ -73,12 +85,32 @@ public abstract class VstConnection implements Connection { private final String connectionName; - protected VstConnection(final HostDescription host, final Integer timeout, final Long ttl, final Boolean useSsl, - final SSLContext sslContext, final MessageStore messageStore) { + private final VPackSlice keepAliveRequest = new VPackBuilder() + .add(ValueType.ARRAY) + .add(1) + .add(1) + .add("_system") + .add(1) + .add("/_admin/server/availability") + .add(ValueType.OBJECT) + .close() + .add(ValueType.OBJECT) + .close() + .close() + .slice(); + + protected VstConnection(final HostDescription host, + final Integer timeout, + final Long ttl, + final Integer keepAliveInterval, + final Boolean useSsl, + final SSLContext sslContext, + final MessageStore messageStore) { super(); this.host = host; this.timeout = timeout; this.ttl = ttl; + this.keepAliveInterval = keepAliveInterval; this.useSsl = useSsl; this.sslContext = sslContext; this.messageStore = messageStore; @@ -87,6 +119,31 @@ protected VstConnection(final HostDescription host, final Integer timeout, final LOGGER.debug("Connection " + connectionName + " created"); } + protected T sendKeepAlive() { + long id = keepAliveId.decrementAndGet(); + Message message = new Message(id, keepAliveRequest, null); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(String.format("Send keepalive probe (id=%s, head=%s, body=%s)", message.getId(), message.getHead(), + message.getBody() != null ? message.getBody() : "{}")); + } + return write(message, Collections.singleton(new Chunk( + id, 0, 1, -1, + 0, keepAliveRequest.getByteSize() + ))); + } + + public abstract T write(final Message message, final Collection chunks); + + protected abstract void doKeepAlive(); + + private void keepAlive() { + try { + doKeepAlive(); + } catch (Exception e) { + LOGGER.error("Got exception while performing keepAlive request:", e); + } + } + public boolean isOpen() { return socket != null && socket.isConnected() && !socket.isClosed(); } @@ -162,10 +219,19 @@ public synchronized void open() throws IOException { return null; }); + + if (keepAliveInterval != null) { + keepAliveScheduler = Executors.newScheduledThreadPool(1); + keepAliveScheduler.scheduleAtFixedRate(this::keepAlive, 0, keepAliveInterval, TimeUnit.SECONDS); + } + } @Override public synchronized void close() { + if (keepAliveScheduler != null) { + keepAliveScheduler.shutdownNow(); + } messageStore.clear(); if (executor != null && !executor.isShutdown()) { executor.shutdown(); diff --git a/src/main/java/com/arangodb/internal/velocystream/internal/VstConnectionSync.java b/src/main/java/com/arangodb/internal/velocystream/internal/VstConnectionSync.java index 54d2ffbfe..1716b4cf5 100644 --- a/src/main/java/com/arangodb/internal/velocystream/internal/VstConnectionSync.java +++ b/src/main/java/com/arangodb/internal/velocystream/internal/VstConnectionSync.java @@ -31,7 +31,7 @@ /** * @author Mark Vollmary */ -public class VstConnectionSync extends VstConnection { +public class VstConnectionSync extends VstConnection { public static class Builder { @@ -39,6 +39,7 @@ public static class Builder { private MessageStore messageStore; private Integer timeout; private Long ttl; + private Integer keepAliveInterval; private Boolean useSsl; private SSLContext sslContext; @@ -72,16 +73,23 @@ public Builder ttl(final Long ttl) { return this; } + public Builder keepAliveInterval(final Integer keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + return this; + } + public VstConnectionSync build() { - return new VstConnectionSync(host, timeout, ttl, useSsl, sslContext, messageStore); + return new VstConnectionSync(host, timeout, ttl, keepAliveInterval, + useSsl, sslContext, messageStore); } } - private VstConnectionSync(final HostDescription host, final Integer timeout, final Long ttl, final Boolean useSsl, - final SSLContext sslContext, final MessageStore messageStore) { - super(host, timeout, ttl, useSsl, sslContext, messageStore); + private VstConnectionSync(final HostDescription host, final Integer timeout, final Long ttl, final Integer keepAliveInterval, + final Boolean useSsl, final SSLContext sslContext, final MessageStore messageStore) { + super(host, timeout, ttl, keepAliveInterval, useSsl, sslContext, messageStore); } + @Override public Message write(final Message message, final Collection chunks) throws ArangoDBException { final FutureTask task = new FutureTask<>(() -> messageStore.get(message.getId())); messageStore.storeMessage(message.getId(), task); @@ -93,4 +101,9 @@ public Message write(final Message message, final Collection chunks) thro } } + @Override + protected void doKeepAlive() { + sendKeepAlive(); + } + }