Skip to content

Commit 8ddf69b

Browse files
committed
only remove not longer used hosts (and connection pools)
1 parent 8ee3414 commit 8ddf69b

File tree

8 files changed

+148
-12
lines changed

8 files changed

+148
-12
lines changed

src/main/java/com/arangodb/internal/ArangoDBImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import java.io.IOException;
2424
import java.util.Collection;
2525

26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
2629
import com.arangodb.ArangoDB;
2730
import com.arangodb.ArangoDBException;
2831
import com.arangodb.ArangoDatabase;
@@ -58,6 +61,8 @@
5861
*
5962
*/
6063
public class ArangoDBImpl extends InternalArangoDB<ArangoExecutorSync> implements ArangoDB {
64+
65+
private static final Logger LOGGER = LoggerFactory.getLogger(ArangoDBImpl.class);
6166

6267
private ArangoCursorInitializer cursorInitializer;
6368
private CommunicationProtocol cp;
@@ -81,6 +86,8 @@ public ArangoDBImpl(final VstCommunicationSync.Builder vstBuilder, final HttpCom
8186

8287
hostResolver.init(this.executor(), util());
8388

89+
LOGGER.info("ArangoDB Client is ready to use");
90+
8491
}
8592

8693
private static CommunicationProtocol createProtocol(

src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@
5555
import com.arangodb.velocypack.VPack;
5656
import com.arangodb.velocypack.VPackParser;
5757

58+
5859
/**
5960
* @author Mark Vollmary
6061
*
6162
*/
6263
public abstract class InternalArangoDBBuilder {
6364

64-
private static final Logger LOGGER = LoggerFactory.getLogger(InternalArangoDBBuilder.class);
65+
private static final Logger LOG = LoggerFactory.getLogger(InternalArangoDBBuilder.class);
6566

6667
private static final String PROPERTY_KEY_HOSTS = "arangodb.hosts";
6768
private static final String PROPERTY_KEY_HOST = "arangodb.host";
@@ -200,17 +201,19 @@ protected void setSerializer(final ArangoSerialization serializer) {
200201
protected HostResolver createHostResolver(final Collection<Host> hosts, final int maxConnections,final ConnectionFactory connectionFactory) {
201202

202203
if(acquireHostList) {
203-
LOGGER.debug("acquireHostList -> Use ExtendedHostResolver");
204+
LOG.debug("acquireHostList -> Use ExtendedHostResolver");
204205
return new ExtendedHostResolver(new ArrayList<Host>(hosts), maxConnections, connectionFactory, acquireHostListInterval);
205206
} else {
206-
LOGGER.debug("Use SimpleHostResolver");
207+
LOG.debug("Use SimpleHostResolver");
207208
return new SimpleHostResolver(new ArrayList<Host>(hosts));
208209
}
209210

210211
}
211212

212213
protected HostHandler createHostHandler(final HostResolver hostResolver) {
214+
213215
final HostHandler hostHandler;
216+
214217
if (loadBalancingStrategy != null) {
215218
switch (loadBalancingStrategy) {
216219
case ONE_RANDOM:
@@ -227,6 +230,9 @@ protected HostHandler createHostHandler(final HostResolver hostResolver) {
227230
} else {
228231
hostHandler = new FallbackHostHandler(hostResolver);
229232
}
233+
234+
LOG.info("HostHandler is " + hostHandler.getClass().getSimpleName());
235+
230236
return new DirtyReadHostHandler(hostHandler, new RoundRobinHostHandler(hostResolver));
231237
}
232238

src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,19 @@
2424
import java.util.ArrayList;
2525
import java.util.List;
2626

27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import com.arangodb.internal.velocystream.internal.VstConnection;
31+
import com.arangodb.internal.velocystream.internal.VstConnectionSync;
32+
2733
/**
2834
* @author Mark Vollmary
2935
*
3036
*/
3137
public class ConnectionPoolImpl implements ConnectionPool {
38+
39+
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPoolImpl.class);
3240

3341
private final HostDescription host;
3442
private final int maxConnections;
@@ -53,7 +61,9 @@ public Connection createConnection(final HostDescription host) {
5361

5462
@Override
5563
public synchronized Connection connection() {
64+
5665
final Connection connection;
66+
5767
if (connections.size() < maxConnections) {
5868
connection = createConnection(host);
5969
connections.add(connection);
@@ -62,6 +72,11 @@ public synchronized Connection connection() {
6272
final int index = (current++) % connections.size();
6373
connection = connections.get(index);
6474
}
75+
76+
if(connection instanceof VstConnectionSync) {
77+
LOGGER.debug("Return Connection " + ((VstConnection)connection).getConnectionName());
78+
}
79+
6580
return connection;
6681
}
6782

@@ -73,4 +88,10 @@ public void close() throws IOException {
7388
connections.clear();
7489
}
7590

91+
@Override
92+
public String toString() {
93+
return "ConnectionPoolImpl [host=" + host + ", maxConnections=" + maxConnections + ", connections="
94+
+ connections.size() + ", current=" + current + ", factory=" + factory.getClass().getSimpleName() + "]";
95+
}
96+
7697
}

src/main/java/com/arangodb/internal/net/ExtendedHostResolver.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.arangodb.internal.net;
2222

23+
import java.io.IOException;
2324
import java.util.ArrayList;
2425
import java.util.Arrays;
2526
import java.util.Collection;
@@ -81,18 +82,18 @@ public void init(ArangoExecutorSync executor, ArangoSerialization arangoSerializ
8182

8283
@Override
8384

84-
public HostSet resolve(final boolean initial, final boolean closeConnections) {
85-
85+
public HostSet resolve(boolean initial, boolean closeConnections) {
86+
8687
if (!initial && isExpired()) {
8788

8889
lastUpdate = System.currentTimeMillis();
8990

9091
final Collection<String> endpoints = resolveFromServer();
91-
LOGGER.debug("Resolve " + endpoints.size() + " Endpoints");
92+
LOGGER.info("Resolve " + endpoints.size() + " Endpoints");
9293
LOGGER.debug("Endpoints " + Arrays.deepToString(endpoints.toArray()));
9394

9495
if (!endpoints.isEmpty()) {
95-
hosts.clear();
96+
hosts.markAllForDeletion();
9697
}
9798

9899
for (final String endpoint : endpoints) {
@@ -117,6 +118,13 @@ public HostSet resolve(final boolean initial, final boolean closeConnections) {
117118
LOGGER.warn("Skip Endpoint (Format)" + endpoint);
118119
}
119120
}
121+
122+
try {
123+
hosts.clearAllMarkedForDeletion();
124+
} catch (IOException e) {
125+
LOGGER.error("Cant close all Hosts with MarkedForDeletion", e);
126+
}
127+
120128
}
121129

122130
return hosts;

src/main/java/com/arangodb/internal/net/Host.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,8 @@ public interface Host {
3535
void closeOnError();
3636

3737
void close() throws IOException;
38+
39+
void setMarkforDeletion(boolean markforDeletion);
40+
41+
boolean isMarkforDeletion();
3842
}

src/main/java/com/arangodb/internal/net/HostImpl.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class HostImpl implements Host {
3232

3333
private final ConnectionPool connectionPool;
3434
private final HostDescription description;
35+
private boolean markforDeletion = false;
3536

3637
public HostImpl(final ConnectionPool connectionPool, final HostDescription description) {
3738
super();
@@ -65,7 +66,43 @@ public void closeOnError() {
6566

6667
@Override
6768
public String toString() {
68-
return "HostImpl [connectionPool=" + connectionPool + ", description=" + description + "]";
69+
return "HostImpl [connectionPool=" + connectionPool + ", description=" + description + ", markforDeletion="
70+
+ markforDeletion + "]";
6971
}
72+
73+
public boolean isMarkforDeletion() {
74+
return markforDeletion;
75+
}
76+
77+
public void setMarkforDeletion(boolean markforDeletion) {
78+
this.markforDeletion = markforDeletion;
79+
}
80+
81+
@Override
82+
public int hashCode() {
83+
final int prime = 31;
84+
int result = 1;
85+
result = prime * result + ((description == null) ? 0 : description.hashCode());
86+
return result;
87+
}
88+
89+
@Override
90+
public boolean equals(Object obj) {
91+
if (this == obj)
92+
return true;
93+
if (obj == null)
94+
return false;
95+
if (getClass() != obj.getClass())
96+
return false;
97+
HostImpl other = (HostImpl) obj;
98+
if (description == null) {
99+
if (other.description != null)
100+
return false;
101+
} else if (!description.equals(other.description))
102+
return false;
103+
return true;
104+
}
105+
106+
70107

71108
}

src/main/java/com/arangodb/internal/net/HostSet.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,16 @@ public List<Host> getHostsList() {
3131
}
3232

3333
public void addHost(Host newHost) {
34+
3435
if(hosts.contains(newHost)) {
3536
LOGGER.debug("Host" + newHost + " allready in Set");
37+
38+
for (Host host : hosts) {
39+
if(host.equals(newHost)) {
40+
host.setMarkforDeletion(false);
41+
}
42+
}
43+
3644
} else {
3745
hosts.add(newHost);
3846
LOGGER.debug("Added Host " + newHost + " - now " + hosts.size() + " Hosts in List");
@@ -55,6 +63,33 @@ public void close() {
5563
}
5664
}
5765

66+
public void markAllForDeletion() {
67+
68+
for (Host host : hosts) {
69+
host.setMarkforDeletion(true);
70+
}
71+
72+
}
73+
74+
public void clearAllMarkedForDeletion() throws IOException {
75+
76+
LOGGER.debug("Clear all Hosts in Set with markForDeletion");
77+
78+
for (Host host : hosts) {
79+
if(host.isMarkforDeletion()) {
80+
try {
81+
82+
LOGGER.debug("Try to close Host " + host);
83+
host.close();
84+
85+
} catch (IOException e) {
86+
LOGGER.warn("Error during closing the Host " + host, e);
87+
}
88+
}
89+
}
90+
91+
}
92+
5893
public void clear() {
5994
LOGGER.debug("Clear all Hosts in Set");
6095

src/main/java/com/arangodb/internal/velocystream/internal/VstConnection.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.nio.ByteOrder;
3131
import java.util.Collection;
3232
import java.util.Date;
33+
import java.util.HashMap;
3334
import java.util.concurrent.Callable;
3435
import java.util.concurrent.ExecutorService;
3536
import java.util.concurrent.Executors;
@@ -69,6 +70,10 @@ public abstract class VstConnection implements Connection {
6970
private InputStream inputStream;
7071

7172
private final HostDescription host;
73+
74+
private HashMap<Long, Long> sendTimestamps = new HashMap<Long, Long>();
75+
76+
private String connectionName;
7277

7378
protected VstConnection(final HostDescription host, final Integer timeout, final Long ttl, final Boolean useSsl,
7479
final SSLContext sslContext, final MessageStore messageStore) {
@@ -79,6 +84,9 @@ protected VstConnection(final HostDescription host, final Integer timeout, final
7984
this.useSsl = useSsl;
8085
this.sslContext = sslContext;
8186
this.messageStore = messageStore;
87+
88+
connectionName = "conenction_" + System.currentTimeMillis() + "_" + Math.random();
89+
LOGGER.debug("Connection " + connectionName + " created");
8290
}
8391

8492
public boolean isOpen() {
@@ -118,11 +126,12 @@ public synchronized void open() throws IOException {
118126
((SSLSocket) socket).startHandshake();
119127
}
120128
sendProtocolHeader();
129+
121130
executor = Executors.newSingleThreadExecutor();
122131
executor.submit(new Callable<Void>() {
123132
@Override
124133
public Void call() throws Exception {
125-
LOGGER.debug("Start Callable for " + this.toString());
134+
LOGGER.debug("Start Callable for " + connectionName);
126135

127136
final long openTime = new Date().getTime();
128137
final Long ttlTime = ttl != null ? openTime + ttl : null;
@@ -153,7 +162,7 @@ public Void call() throws Exception {
153162
}
154163
}
155164

156-
LOGGER.debug("Stop Callable for " + this.toString());
165+
LOGGER.debug("Stop Callable for " + connectionName);
157166

158167
return null;
159168
}
@@ -193,6 +202,7 @@ protected synchronized void writeIntern(final Message message, final Collection<
193202
if (LOGGER.isDebugEnabled()) {
194203
LOGGER.debug(String.format("Send chunk %s:%s from message %s", chunk.getChunk(),
195204
chunk.isFirstChunk() ? 1 : 0, chunk.getMessageId()));
205+
sendTimestamps.put(chunk.getMessageId(), System.currentTimeMillis());
196206
}
197207
writeChunkHead(chunk);
198208
final int contentOffset = chunk.getContentOffset();
@@ -210,6 +220,7 @@ protected synchronized void writeIntern(final Message message, final Collection<
210220
}
211221
outputStream.flush();
212222
} catch (final IOException e) {
223+
LOGGER.error("Error on Connection " + connectionName);
213224
throw new ArangoDBException(e);
214225
}
215226
}
@@ -245,10 +256,13 @@ protected Chunk readChunk() throws IOException {
245256
contentLength = length - ArangoDefaults.CHUNK_MIN_HEADER_SIZE;
246257
}
247258
final Chunk chunk = new Chunk(messageId, chunkX, messageLength, 0, contentLength);
259+
248260
if (LOGGER.isDebugEnabled()) {
249-
LOGGER.debug(String.format("Received chunk %s:%s from message %s", chunk.getChunk(),
250-
chunk.isFirstChunk() ? 1 : 0, chunk.getMessageId()));
261+
262+
LOGGER.debug(String.format("Received chunk %s:%s from message %s", chunk.getChunk(), chunk.isFirstChunk() ? 1 : 0, chunk.getMessageId()));
263+
LOGGER.debug("Responsetime for Message " + chunk.getMessageId() + " is " + (sendTimestamps.get(chunk.getMessageId())-System.currentTimeMillis()));
251264
}
265+
252266
return chunk;
253267
}
254268

@@ -268,5 +282,9 @@ protected void readBytesIntoBuffer(final byte[] buf, final int off, final int le
268282
}
269283
}
270284
}
285+
286+
public String getConnectionName() {
287+
return this.connectionName;
288+
}
271289

272290
}

0 commit comments

Comments
 (0)