diff --git a/ChangeLog.md b/ChangeLog.md index b331518c9..ab58a1951 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) a ### Fixed +- add arangodb.httpCookieSpec - host handling (issue #241) - logging extended hostresolver diff --git a/pom.xml b/pom.xml index 28f0deba3..584d55d94 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 1.4.1 - 4.5.7 + 4.5.8 1.1.3 @@ -278,7 +278,7 @@ commons-codec commons-codec - 1.11 + 1.12 commons-logging diff --git a/src/main/java/com/arangodb/ArangoDB.java b/src/main/java/com/arangodb/ArangoDB.java index 9d7e038f3..ffad3b011 100644 --- a/src/main/java/com/arangodb/ArangoDB.java +++ b/src/main/java/com/arangodb/ArangoDB.java @@ -611,11 +611,12 @@ public synchronized ArangoDB build() { final ConnectionFactory connectionFactory = (protocol == null || Protocol.VST == protocol) ? new VstConnectionFactorySync(host, timeout, connectionTtl, useSsl, sslContext) : new HttpConnectionFactory(timeout, user, password, useSsl, sslContext, custom, protocol, - connectionTtl); + connectionTtl, httpCookieSpec); final Collection hostList = createHostList(max, connectionFactory); final HostResolver hostResolver = createHostResolver(hostList, max, connectionFactory); final HostHandler hostHandler = createHostHandler(hostResolver); + return new ArangoDBImpl( new VstCommunicationSync.Builder(hostHandler).timeout(timeout).user(user).password(password) .useSsl(useSsl).sslContext(sslContext).chunksize(chunksize).maxConnections(maxConnections) diff --git a/src/main/java/com/arangodb/internal/ArangoDBImpl.java b/src/main/java/com/arangodb/internal/ArangoDBImpl.java index cf10e9e75..fa88d6e0d 100644 --- a/src/main/java/com/arangodb/internal/ArangoDBImpl.java +++ b/src/main/java/com/arangodb/internal/ArangoDBImpl.java @@ -21,10 +21,10 @@ package com.arangodb.internal; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.arangodb.ArangoDB; import com.arangodb.ArangoDBException; @@ -42,7 +42,6 @@ import com.arangodb.internal.net.CommunicationProtocol; import com.arangodb.internal.net.HostHandle; import com.arangodb.internal.net.HostResolver; -import com.arangodb.internal.net.HostResolver.EndpointResolver; import com.arangodb.internal.util.ArangoSerializationFactory; import com.arangodb.internal.util.ArangoSerializationFactory.Serializer; import com.arangodb.internal.velocystream.VstCommunicationSync; @@ -52,10 +51,8 @@ import com.arangodb.model.UserUpdateOptions; import com.arangodb.util.ArangoCursorInitializer; import com.arangodb.util.ArangoSerialization; -import com.arangodb.velocypack.VPackSlice; import com.arangodb.velocypack.exception.VPackException; import com.arangodb.velocystream.Request; -import com.arangodb.velocystream.RequestType; import com.arangodb.velocystream.Response; /** @@ -64,6 +61,8 @@ * */ public class ArangoDBImpl extends InternalArangoDB implements ArangoDB { + + private static final Logger LOGGER = LoggerFactory.getLogger(ArangoDBImpl.class); private ArangoCursorInitializer cursorInitializer; private CommunicationProtocol cp; @@ -71,51 +70,24 @@ public class ArangoDBImpl extends InternalArangoDB implement public ArangoDBImpl(final VstCommunicationSync.Builder vstBuilder, final HttpCommunication.Builder httpBuilder, final ArangoSerializationFactory util, final Protocol protocol, final HostResolver hostResolver, final ArangoContext context) { - super(new ArangoExecutorSync(createProtocol(vstBuilder, httpBuilder, util.get(Serializer.INTERNAL), protocol), - util, new DocumentCache()), util, context); - cp = createProtocol(new VstCommunicationSync.Builder(vstBuilder).maxConnections(1), - new HttpCommunication.Builder(httpBuilder), util.get(Serializer.INTERNAL), protocol); - hostResolver.init(new EndpointResolver() { - @Override - public Collection resolve(final boolean closeConnections) throws ArangoDBException { - Collection response; - try { - response = executor.execute(new Request(ArangoRequestParam.SYSTEM, RequestType.GET, PATH_ENDPOINTS), - new ResponseDeserializer>() { - @Override - public Collection deserialize(final Response response) throws VPackException { - final VPackSlice field = response.getBody().get("endpoints"); - Collection endpoints; - if (field.isNone()) { - endpoints = Collections. emptyList(); - } else { - final Collection> tmp = util().deserialize(field, - Collection.class); - endpoints = new ArrayList(); - for (final Map map : tmp) { - for (final String value : map.values()) { - endpoints.add(value); - } - } - } - return endpoints; - } - }, null); - } catch (final ArangoDBException e) { - final Integer responseCode = e.getResponseCode(); - if (responseCode != null && responseCode == 403) { - response = Collections. emptyList(); - } else { - throw e; - } - } finally { - if (closeConnections) { - ArangoDBImpl.this.shutdown(); - } - } - return response; - } - }); + + super(new ArangoExecutorSync( + createProtocol(vstBuilder, httpBuilder, util.get(Serializer.INTERNAL), protocol), + util, + new DocumentCache()), + util, + context); + + cp = createProtocol( + new VstCommunicationSync.Builder(vstBuilder).maxConnections(1), + new HttpCommunication.Builder(httpBuilder), + util.get(Serializer.INTERNAL), + protocol); + + hostResolver.init(this.executor(), util()); + + LOGGER.info("ArangoDB Client is ready to use"); + } private static CommunicationProtocol createProtocol( @@ -123,6 +95,7 @@ private static CommunicationProtocol createProtocol( final HttpCommunication.Builder httpBuilder, final ArangoSerialization util, final Protocol protocol) { + return (protocol == null || Protocol.VST == protocol) ? createVST(vstBuilder, util) : createHTTP(httpBuilder, util); } diff --git a/src/main/java/com/arangodb/internal/ArangoDefaults.java b/src/main/java/com/arangodb/internal/ArangoDefaults.java index dbe0b7984..194028c80 100644 --- a/src/main/java/com/arangodb/internal/ArangoDefaults.java +++ b/src/main/java/com/arangodb/internal/ArangoDefaults.java @@ -49,6 +49,7 @@ private ArangoDefaults() { public static final int MAX_CONNECTIONS_HTTP_DEFAULT = 20; public static final Protocol DEFAULT_NETWORK_PROTOCOL = Protocol.VST; public static final boolean DEFAULT_ACQUIRE_HOST_LIST = false; + public static final int DEFAULT_ACQUIRE_HOST_LIST_INTERVAL = 60 * 60 * 1000; // hour public static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY = LoadBalancingStrategy.NONE; } diff --git a/src/main/java/com/arangodb/internal/InternalArangoDB.java b/src/main/java/com/arangodb/internal/InternalArangoDB.java index 49939a365..b04a5af7a 100644 --- a/src/main/java/com/arangodb/internal/InternalArangoDB.java +++ b/src/main/java/com/arangodb/internal/InternalArangoDB.java @@ -54,7 +54,7 @@ public abstract class InternalArangoDB extends ArangoE private static final String PATH_API_ADMIN_LOG = "/_admin/log"; private static final String PATH_API_ADMIN_LOG_LEVEL = "/_admin/log/level"; private static final String PATH_API_ROLE = "/_admin/server/role"; - protected static final String PATH_ENDPOINTS = "/_api/cluster/endpoints"; + private static final String PATH_ENDPOINTS = "/_api/cluster/endpoints"; private static final String PATH_API_USER = "/_api/user"; protected InternalArangoDB(final E executor, final ArangoSerializationFactory util, final ArangoContext context) { diff --git a/src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java b/src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java index 918a8dff4..45d1e0389 100644 --- a/src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java +++ b/src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java @@ -55,13 +55,14 @@ import com.arangodb.velocypack.VPack; import com.arangodb.velocypack.VPackParser; + /** * @author Mark Vollmary * */ public abstract class InternalArangoDBBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(InternalArangoDBBuilder.class); + private static final Logger LOG = LoggerFactory.getLogger(InternalArangoDBBuilder.class); private static final String PROPERTY_KEY_HOSTS = "arangodb.hosts"; private static final String PROPERTY_KEY_HOST = "arangodb.host"; @@ -70,10 +71,12 @@ public abstract class InternalArangoDBBuilder { private static final String PROPERTY_KEY_USER = "arangodb.user"; private static final String PROPERTY_KEY_PASSWORD = "arangodb.password"; private static final String PROPERTY_KEY_USE_SSL = "arangodb.usessl"; + private static final String PROPERTY_KEY_COOKIE_SPEC = "arangodb.httpCookieSpec"; private static final String PROPERTY_KEY_V_STREAM_CHUNK_CONTENT_SIZE = "arangodb.chunksize"; private static final String PROPERTY_KEY_MAX_CONNECTIONS = "arangodb.connections.max"; private static final String PROPERTY_KEY_CONNECTION_TTL = "arangodb.connections.ttl"; private static final String PROPERTY_KEY_ACQUIRE_HOST_LIST = "arangodb.acquireHostList"; + private static final String PROPERTY_KEY_ACQUIRE_HOST_LIST_INTERVAL = "arangodb.acquireHostList.interval"; private static final String PROPERTY_KEY_LOAD_BALANCING_STRATEGY = "arangodb.loadBalancingStrategy"; private static final String DEFAULT_PROPERTY_FILE = "/arangodb.properties"; @@ -83,6 +86,7 @@ public abstract class InternalArangoDBBuilder { protected String user; protected String password; protected Boolean useSsl; + protected String httpCookieSpec; protected SSLContext sslContext; protected Integer chunksize; protected Integer maxConnections; @@ -92,9 +96,12 @@ public abstract class InternalArangoDBBuilder { protected ArangoSerializer serializer; protected ArangoDeserializer deserializer; protected Boolean acquireHostList; + protected Integer acquireHostListInterval; protected LoadBalancingStrategy loadBalancingStrategy; protected ArangoSerialization customSerializer; + + public InternalArangoDBBuilder() { super(); vpackBuilder = new VPack.Builder(); @@ -129,10 +136,12 @@ protected void loadProperties(final Properties properties) { user = loadUser(properties, user); password = loadPassword(properties, password); useSsl = loadUseSsl(properties, useSsl); + httpCookieSpec = loadhttpCookieSpec(properties, useSsl); chunksize = loadChunkSize(properties, chunksize); maxConnections = loadMaxConnections(properties, maxConnections); connectionTtl = loadConnectionTtl(properties, connectionTtl); acquireHostList = loadAcquireHostList(properties, acquireHostList); + acquireHostListInterval = loadAcquireHostListInterval(properties, acquireHostListInterval); loadBalancingStrategy = loadLoadBalancingStrategy(properties, loadBalancingStrategy); } @@ -195,17 +204,19 @@ protected void setSerializer(final ArangoSerialization serializer) { protected HostResolver createHostResolver(final Collection hosts, final int maxConnections,final ConnectionFactory connectionFactory) { if(acquireHostList) { - LOGGER.debug("acquireHostList -> Use ExtendedHostResolver"); - return new ExtendedHostResolver(new ArrayList(hosts), maxConnections, connectionFactory); + LOG.debug("acquireHostList -> Use ExtendedHostResolver"); + return new ExtendedHostResolver(new ArrayList(hosts), maxConnections, connectionFactory, acquireHostListInterval); } else { - LOGGER.debug("Use SimpleHostResolver"); + LOG.debug("Use SimpleHostResolver"); return new SimpleHostResolver(new ArrayList(hosts)); } } protected HostHandler createHostHandler(final HostResolver hostResolver) { + final HostHandler hostHandler; + if (loadBalancingStrategy != null) { switch (loadBalancingStrategy) { case ONE_RANDOM: @@ -222,6 +233,9 @@ protected HostHandler createHostHandler(final HostResolver hostResolver) { } else { hostHandler = new FallbackHostHandler(hostResolver); } + + LOG.info("HostHandler is " + hostHandler.getClass().getSimpleName()); + return new DirtyReadHostHandler(hostHandler, new RoundRobinHostHandler(hostResolver)); } @@ -273,6 +287,10 @@ private static Boolean loadUseSsl(final Properties properties, final Boolean cur return Boolean.parseBoolean( getProperty(properties, PROPERTY_KEY_USE_SSL, currentValue, ArangoDefaults.DEFAULT_USE_SSL)); } + + private static String loadhttpCookieSpec(final Properties properties, final Boolean currentValue) { + return getProperty(properties, PROPERTY_KEY_COOKIE_SPEC, currentValue, ""); + } private static Integer loadChunkSize(final Properties properties, final Integer currentValue) { return Integer.parseInt(getProperty(properties, PROPERTY_KEY_V_STREAM_CHUNK_CONTENT_SIZE, currentValue, @@ -295,6 +313,11 @@ private static Boolean loadAcquireHostList(final Properties properties, final Bo ArangoDefaults.DEFAULT_ACQUIRE_HOST_LIST)); } + private static int loadAcquireHostListInterval(final Properties properties, final Integer currentValue) { + return Integer.parseInt(getProperty(properties, PROPERTY_KEY_ACQUIRE_HOST_LIST_INTERVAL, currentValue, + ArangoDefaults.DEFAULT_ACQUIRE_HOST_LIST_INTERVAL)); + } + private static LoadBalancingStrategy loadLoadBalancingStrategy( final Properties properties, final LoadBalancingStrategy currentValue) { diff --git a/src/main/java/com/arangodb/internal/http/HttpConnection.java b/src/main/java/com/arangodb/internal/http/HttpConnection.java index 1b4df055e..c34e49638 100644 --- a/src/main/java/com/arangodb/internal/http/HttpConnection.java +++ b/src/main/java/com/arangodb/internal/http/HttpConnection.java @@ -101,6 +101,7 @@ public static class Builder { private String password; private ArangoSerialization util; private Boolean useSsl; + private String httpCookieSpec; private Protocol contentType; private HostDescription host; private Long ttl; @@ -126,6 +127,11 @@ public Builder useSsl(final Boolean useSsl) { this.useSsl = useSsl; return this; } + + public Builder httpCookieSpec(String httpCookieSpec) { + this.httpCookieSpec = httpCookieSpec; + return this; + } public Builder contentType(final Protocol contentType) { this.contentType = contentType; @@ -153,7 +159,7 @@ public Builder timeout(final Integer timeout) { } public HttpConnection build() { - return new HttpConnection(host, timeout, user, password, useSsl, sslContext, util, contentType, ttl); + return new HttpConnection(host, timeout, user, password, useSsl, sslContext, util, contentType, ttl, httpCookieSpec); } } @@ -168,7 +174,7 @@ public HttpConnection build() { private HttpConnection(final HostDescription host, final Integer timeout, final String user, final String password, final Boolean useSsl, final SSLContext sslContext, final ArangoSerialization util, final Protocol contentType, - final Long ttl) { + final Long ttl, final String httpCookieSpec) { super(); this.host = host; this.user = user; @@ -196,6 +202,11 @@ private HttpConnection(final HostDescription host, final Integer timeout, final requestConfig.setConnectionRequestTimeout(timeout); requestConfig.setSocketTimeout(timeout); } + + if (httpCookieSpec != null && httpCookieSpec.length() > 1) { + requestConfig.setCookieSpec(httpCookieSpec); + } + final ConnectionKeepAliveStrategy keepAliveStrategy = new ConnectionKeepAliveStrategy() { @Override public long getKeepAliveDuration(final HttpResponse response, final HttpContext context) { diff --git a/src/main/java/com/arangodb/internal/http/HttpConnectionFactory.java b/src/main/java/com/arangodb/internal/http/HttpConnectionFactory.java index 9418b24cf..71e09446d 100644 --- a/src/main/java/com/arangodb/internal/http/HttpConnectionFactory.java +++ b/src/main/java/com/arangodb/internal/http/HttpConnectionFactory.java @@ -38,10 +38,10 @@ public class HttpConnectionFactory implements ConnectionFactory { public HttpConnectionFactory(final Integer timeout, final String user, final String password, final Boolean useSsl, final SSLContext sslContext, final ArangoSerialization util, final Protocol protocol, - final Long connectionTtl) { + final Long connectionTtl, String httpCookieSpec) { super(); builder = new HttpConnection.Builder().timeout(timeout).user(user).password(password).useSsl(useSsl) - .sslContext(sslContext).serializationUtil(util).contentType(protocol).ttl(connectionTtl); + .sslContext(sslContext).serializationUtil(util).contentType(protocol).ttl(connectionTtl).httpCookieSpec(httpCookieSpec); } diff --git a/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java b/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java index a0c631c5e..5aa7d5d45 100644 --- a/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java +++ b/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java @@ -24,11 +24,19 @@ import java.util.ArrayList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.arangodb.internal.velocystream.internal.VstConnection; +import com.arangodb.internal.velocystream.internal.VstConnectionSync; + /** * @author Mark Vollmary * */ public class ConnectionPoolImpl implements ConnectionPool { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPoolImpl.class); private final HostDescription host; private final int maxConnections; @@ -53,7 +61,9 @@ public Connection createConnection(final HostDescription host) { @Override public synchronized Connection connection() { + final Connection connection; + if (connections.size() < maxConnections) { connection = createConnection(host); connections.add(connection); @@ -62,6 +72,11 @@ public synchronized Connection connection() { final int index = (current++) % connections.size(); connection = connections.get(index); } + + if(connection instanceof VstConnectionSync) { + LOGGER.debug("Return Connection " + ((VstConnection)connection).getConnectionName()); + } + return connection; } @@ -73,4 +88,10 @@ public void close() throws IOException { connections.clear(); } + @Override + public String toString() { + return "ConnectionPoolImpl [host=" + host + ", maxConnections=" + maxConnections + ", connections=" + + connections.size() + ", current=" + current + ", factory=" + factory.getClass().getSimpleName() + "]"; + } + } diff --git a/src/main/java/com/arangodb/internal/net/ExtendedHostResolver.java b/src/main/java/com/arangodb/internal/net/ExtendedHostResolver.java index a2bddd108..1d624ab83 100644 --- a/src/main/java/com/arangodb/internal/net/ExtendedHostResolver.java +++ b/src/main/java/com/arangodb/internal/net/ExtendedHostResolver.java @@ -20,90 +20,157 @@ package com.arangodb.internal.net; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.arangodb.ArangoDBException; +import com.arangodb.internal.ArangoExecutor.ResponseDeserializer; +import com.arangodb.internal.ArangoExecutorSync; +import com.arangodb.internal.ArangoRequestParam; import com.arangodb.internal.util.HostUtils; +import com.arangodb.util.ArangoSerialization; +import com.arangodb.velocypack.VPackSlice; +import com.arangodb.velocypack.exception.VPackException; +import com.arangodb.velocystream.Request; +import com.arangodb.velocystream.RequestType; +import com.arangodb.velocystream.Response; /** * @author Mark Vollmary * */ public class ExtendedHostResolver implements HostResolver { - + private static final Logger LOGGER = LoggerFactory.getLogger(ExtendedHostResolver.class); - private static final long MAX_CACHE_TIME = 60 * 60 * 1000; - - private EndpointResolver resolver; private HostSet hosts; - + private final Integer maxConnections; private final ConnectionFactory connectionFactory; - + private long lastUpdate; + private Integer acquireHostListInterval; + + private ArangoExecutorSync executor; + private ArangoSerialization arangoSerialization; + public ExtendedHostResolver(final List hosts, final Integer maxConnections, - final ConnectionFactory connectionFactory) { - super(); + final ConnectionFactory connectionFactory, Integer acquireHostListInterval) { + + this.acquireHostListInterval = acquireHostListInterval; this.hosts = new HostSet(hosts); this.maxConnections = maxConnections; this.connectionFactory = connectionFactory; + lastUpdate = 0; } @Override - public void init(final EndpointResolver resolver) { - this.resolver = resolver; + public void init(ArangoExecutorSync executor, ArangoSerialization arangoSerialization) { + this.executor = executor; + this.arangoSerialization = arangoSerialization; } @Override - public HostSet resolve(final boolean initial, final boolean closeConnections) { + public HostSet resolve(boolean initial, boolean closeConnections) { if (!initial && isExpired()) { - + lastUpdate = System.currentTimeMillis(); - - final Collection endpoints = resolver.resolve(closeConnections); - LOGGER.debug("Resolve " + endpoints.size() + " Endpoints"); + + final Collection endpoints = resolveFromServer(); + LOGGER.info("Resolve " + endpoints.size() + " Endpoints"); LOGGER.debug("Endpoints " + Arrays.deepToString(endpoints.toArray())); - + if (!endpoints.isEmpty()) { - hosts.clear(); + hosts.markAllForDeletion(); } - + for (final String endpoint : endpoints) { LOGGER.debug("Create HOST from " + endpoint); - + if (endpoint.matches(".*://.+:[0-9]+")) { - + final String[] s = endpoint.replaceAll(".*://", "").split(":"); if (s.length == 2) { final HostDescription description = new HostDescription(s[0], Integer.valueOf(s[1])); hosts.addHost(HostUtils.createHost(description, maxConnections, connectionFactory)); } else if (s.length == 4) { - // IPV6 Address - TODO: we need a proper function to resolve AND support IPV4 & IPV6 functions globally + // IPV6 Address - TODO: we need a proper function to resolve AND support IPV4 & IPV6 functions + // globally final HostDescription description = new HostDescription("127.0.0.1", Integer.valueOf(s[3])); hosts.addHost(HostUtils.createHost(description, maxConnections, connectionFactory)); } else { LOGGER.warn("Skip Endpoint (Missing Port)" + endpoint); } - + } else { LOGGER.warn("Skip Endpoint (Format)" + endpoint); } } + + try { + hosts.clearAllMarkedForDeletion(); + } catch (IOException e) { + LOGGER.error("Cant close all Hosts with MarkedForDeletion", e); + } + } - + return hosts; } + private Collection resolveFromServer() throws ArangoDBException { + + Collection response; + + try { + + response = executor.execute( + new Request(ArangoRequestParam.SYSTEM, RequestType.GET, "/_api/cluster/endpoints"), + new ResponseDeserializer>() { + @Override + public Collection deserialize(final Response response) throws VPackException { + final VPackSlice field = response.getBody().get("endpoints"); + Collection endpoints; + if (field.isNone()) { + endpoints = Collections. emptyList(); + } else { + final Collection> tmp = arangoSerialization.deserialize(field, Collection.class); + endpoints = new ArrayList(); + for (final Map map : tmp) { + for (final String value : map.values()) { + endpoints.add(value); + } + } + } + return endpoints; + } + }, null); + } catch (final ArangoDBException e) { + final Integer responseCode = e.getResponseCode(); + if (responseCode != null && responseCode == 403) { + response = Collections. emptyList(); + } else { + throw e; + } + } + + return response; + } + private boolean isExpired() { - return System.currentTimeMillis() > lastUpdate + MAX_CACHE_TIME; + return System.currentTimeMillis() > (lastUpdate + acquireHostListInterval); } + } diff --git a/src/main/java/com/arangodb/internal/net/Host.java b/src/main/java/com/arangodb/internal/net/Host.java index 7bbefa622..da8580e38 100644 --- a/src/main/java/com/arangodb/internal/net/Host.java +++ b/src/main/java/com/arangodb/internal/net/Host.java @@ -35,4 +35,8 @@ public interface Host { void closeOnError(); void close() throws IOException; + + void setMarkforDeletion(boolean markforDeletion); + + boolean isMarkforDeletion(); } diff --git a/src/main/java/com/arangodb/internal/net/HostImpl.java b/src/main/java/com/arangodb/internal/net/HostImpl.java index 81436850c..04a428537 100644 --- a/src/main/java/com/arangodb/internal/net/HostImpl.java +++ b/src/main/java/com/arangodb/internal/net/HostImpl.java @@ -32,6 +32,7 @@ public class HostImpl implements Host { private final ConnectionPool connectionPool; private final HostDescription description; + private boolean markforDeletion = false; public HostImpl(final ConnectionPool connectionPool, final HostDescription description) { super(); @@ -65,7 +66,43 @@ public void closeOnError() { @Override public String toString() { - return "HostImpl [connectionPool=" + connectionPool + ", description=" + description + "]"; + return "HostImpl [connectionPool=" + connectionPool + ", description=" + description + ", markforDeletion=" + + markforDeletion + "]"; } + + public boolean isMarkforDeletion() { + return markforDeletion; + } + + public void setMarkforDeletion(boolean markforDeletion) { + this.markforDeletion = markforDeletion; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((description == null) ? 0 : description.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HostImpl other = (HostImpl) obj; + if (description == null) { + if (other.description != null) + return false; + } else if (!description.equals(other.description)) + return false; + return true; + } + + } diff --git a/src/main/java/com/arangodb/internal/net/HostResolver.java b/src/main/java/com/arangodb/internal/net/HostResolver.java index cae643c0d..8f1477101 100644 --- a/src/main/java/com/arangodb/internal/net/HostResolver.java +++ b/src/main/java/com/arangodb/internal/net/HostResolver.java @@ -20,9 +20,8 @@ package com.arangodb.internal.net; -import java.util.Collection; - -import com.arangodb.ArangoDBException; +import com.arangodb.internal.ArangoExecutorSync; +import com.arangodb.util.ArangoSerialization; /** * @author Mark Vollmary @@ -30,11 +29,7 @@ */ public interface HostResolver { - public interface EndpointResolver { - Collection resolve(boolean closeConnections) throws ArangoDBException; - } - - void init(final EndpointResolver resolver); + void init(ArangoExecutorSync executorSync, ArangoSerialization arangoSerialization); HostSet resolve(boolean initial, boolean closeConnections); diff --git a/src/main/java/com/arangodb/internal/net/HostSet.java b/src/main/java/com/arangodb/internal/net/HostSet.java index 41d919c1c..f3b727c4a 100644 --- a/src/main/java/com/arangodb/internal/net/HostSet.java +++ b/src/main/java/com/arangodb/internal/net/HostSet.java @@ -31,8 +31,16 @@ public List getHostsList() { } public void addHost(Host newHost) { + if(hosts.contains(newHost)) { LOGGER.debug("Host" + newHost + " allready in Set"); + + for (Host host : hosts) { + if(host.equals(newHost)) { + host.setMarkforDeletion(false); + } + } + } else { hosts.add(newHost); LOGGER.debug("Added Host " + newHost + " - now " + hosts.size() + " Hosts in List"); @@ -55,6 +63,33 @@ public void close() { } } + public void markAllForDeletion() { + + for (Host host : hosts) { + host.setMarkforDeletion(true); + } + + } + + public void clearAllMarkedForDeletion() throws IOException { + + LOGGER.debug("Clear all Hosts in Set with markForDeletion"); + + for (Host host : hosts) { + if(host.isMarkforDeletion()) { + try { + + LOGGER.debug("Try to close Host " + host); + host.close(); + + } catch (IOException e) { + LOGGER.warn("Error during closing the Host " + host, e); + } + } + } + + } + public void clear() { LOGGER.debug("Clear all Hosts in Set"); diff --git a/src/main/java/com/arangodb/internal/net/SimpleHostResolver.java b/src/main/java/com/arangodb/internal/net/SimpleHostResolver.java index 5bedd446b..dc47987f9 100644 --- a/src/main/java/com/arangodb/internal/net/SimpleHostResolver.java +++ b/src/main/java/com/arangodb/internal/net/SimpleHostResolver.java @@ -22,6 +22,9 @@ import java.util.List; +import com.arangodb.internal.ArangoExecutorSync; +import com.arangodb.util.ArangoSerialization; + /** * @author Mark Vollmary * @@ -29,14 +32,15 @@ public class SimpleHostResolver implements HostResolver { private final List hosts; - + public SimpleHostResolver(final List hosts) { super(); this.hosts = hosts; } @Override - public void init(final EndpointResolver resolver) { + public void init(ArangoExecutorSync executor, ArangoSerialization arangoSerialization) { + } @Override diff --git a/src/main/java/com/arangodb/internal/velocystream/internal/VstConnection.java b/src/main/java/com/arangodb/internal/velocystream/internal/VstConnection.java index 51f66face..5fa8c1eae 100644 --- a/src/main/java/com/arangodb/internal/velocystream/internal/VstConnection.java +++ b/src/main/java/com/arangodb/internal/velocystream/internal/VstConnection.java @@ -30,6 +30,7 @@ import java.nio.ByteOrder; import java.util.Collection; import java.util.Date; +import java.util.HashMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -69,6 +70,10 @@ public abstract class VstConnection implements Connection { private InputStream inputStream; private final HostDescription host; + + private HashMap sendTimestamps = new HashMap(); + + private String connectionName; protected VstConnection(final HostDescription host, final Integer timeout, final Long ttl, final Boolean useSsl, final SSLContext sslContext, final MessageStore messageStore) { @@ -79,6 +84,9 @@ protected VstConnection(final HostDescription host, final Integer timeout, final this.useSsl = useSsl; this.sslContext = sslContext; this.messageStore = messageStore; + + connectionName = "conenction_" + System.currentTimeMillis() + "_" + Math.random(); + LOGGER.debug("Connection " + connectionName + " created"); } public boolean isOpen() { @@ -118,11 +126,12 @@ public synchronized void open() throws IOException { ((SSLSocket) socket).startHandshake(); } sendProtocolHeader(); + executor = Executors.newSingleThreadExecutor(); executor.submit(new Callable() { @Override public Void call() throws Exception { - LOGGER.debug("Start Callable for " + this.toString()); + LOGGER.debug("Start Callable for " + connectionName); final long openTime = new Date().getTime(); final Long ttlTime = ttl != null ? openTime + ttl : null; @@ -153,7 +162,7 @@ public Void call() throws Exception { } } - LOGGER.debug("Stop Callable for " + this.toString()); + LOGGER.debug("Stop Callable for " + connectionName); return null; } @@ -193,6 +202,7 @@ protected synchronized void writeIntern(final Message message, final Collection< if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("Send chunk %s:%s from message %s", chunk.getChunk(), chunk.isFirstChunk() ? 1 : 0, chunk.getMessageId())); + sendTimestamps.put(chunk.getMessageId(), System.currentTimeMillis()); } writeChunkHead(chunk); final int contentOffset = chunk.getContentOffset(); @@ -210,6 +220,7 @@ protected synchronized void writeIntern(final Message message, final Collection< } outputStream.flush(); } catch (final IOException e) { + LOGGER.error("Error on Connection " + connectionName); throw new ArangoDBException(e); } } @@ -245,10 +256,13 @@ protected Chunk readChunk() throws IOException { contentLength = length - ArangoDefaults.CHUNK_MIN_HEADER_SIZE; } final Chunk chunk = new Chunk(messageId, chunkX, messageLength, 0, contentLength); + if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format("Received chunk %s:%s from message %s", chunk.getChunk(), - chunk.isFirstChunk() ? 1 : 0, chunk.getMessageId())); + + LOGGER.debug(String.format("Received chunk %s:%s from message %s", chunk.getChunk(), chunk.isFirstChunk() ? 1 : 0, chunk.getMessageId())); + LOGGER.debug("Responsetime for Message " + chunk.getMessageId() + " is " + (sendTimestamps.get(chunk.getMessageId())-System.currentTimeMillis())); } + return chunk; } @@ -268,5 +282,9 @@ protected void readBytesIntoBuffer(final byte[] buf, final int off, final int le } } } + + public String getConnectionName() { + return this.connectionName; + } } diff --git a/src/test/java/com/arangodb/internal/HostHandlerTest.java b/src/test/java/com/arangodb/internal/HostHandlerTest.java index eec8eec2a..5152fd5c5 100644 --- a/src/test/java/com/arangodb/internal/HostHandlerTest.java +++ b/src/test/java/com/arangodb/internal/HostHandlerTest.java @@ -37,6 +37,7 @@ import com.arangodb.internal.net.HostSet; import com.arangodb.internal.net.RandomHostHandler; import com.arangodb.internal.net.RoundRobinHostHandler; +import com.arangodb.util.ArangoSerialization; /** * @author Mark Vollmary @@ -59,7 +60,7 @@ public HostSet resolve(final boolean initial, final boolean closeConnections) { } @Override - public void init(final EndpointResolver resolver) { + public void init(ArangoExecutorSync executor, ArangoSerialization arangoSerialization) { } @@ -78,7 +79,7 @@ public HostSet resolve(final boolean initial, final boolean closeConnections) { } @Override - public void init(final EndpointResolver resolver) { + public void init(ArangoExecutorSync executor, ArangoSerialization arangoSerialization) { }