42
42
from neo4j .meta import version
43
43
from neo4j .packstream import Packer , Unpacker
44
44
from neo4j .util import import_best as _import_best
45
+ from time import clock
45
46
46
47
ChunkedInputBuffer = _import_best ("neo4j.bolt._io" , "neo4j.bolt.io" ).ChunkedInputBuffer
47
48
ChunkedOutputBuffer = _import_best ("neo4j.bolt._io" , "neo4j.bolt.io" ).ChunkedOutputBuffer
48
49
49
50
51
+ INFINITE_CONNECTION_LIFETIME = - 1
52
+ DEFAULT_MAX_CONNECTION_LIFETIME = INFINITE_CONNECTION_LIFETIME
50
53
DEFAULT_CONNECTION_TIMEOUT = 5.0
51
54
DEFAULT_PORT = 7687
52
55
DEFAULT_USER_AGENT = "neo4j-python/%s" % version
@@ -178,6 +181,8 @@ def __init__(self, address, sock, error_handler, **config):
178
181
self .packer = Packer (self .output_buffer )
179
182
self .unpacker = Unpacker ()
180
183
self .responses = deque ()
184
+ self ._max_connection_lifetime = config .get ("max_connection_lifetime" , DEFAULT_MAX_CONNECTION_LIFETIME )
185
+ self ._creation_timestamp = clock ()
181
186
182
187
# Determine the user agent and ensure it is a Unicode value
183
188
user_agent = config .get ("user_agent" , DEFAULT_USER_AGENT )
@@ -201,6 +206,7 @@ def __init__(self, address, sock, error_handler, **config):
201
206
# Pick up the server certificate, if any
202
207
self .der_encoded_server_certificate = config .get ("der_encoded_server_certificate" )
203
208
209
+ def Init (self ):
204
210
response = InitResponse (self )
205
211
self .append (INIT , (self .user_agent , self .auth_dict ), response = response )
206
212
self .sync ()
@@ -360,6 +366,9 @@ def _unpack(self):
360
366
more = False
361
367
return details , summary_signature , summary_metadata
362
368
369
+ def timedout (self ):
370
+ return 0 <= self ._max_connection_lifetime <= clock () - self ._creation_timestamp
371
+
363
372
def sync (self ):
364
373
""" Send and fetch all outstanding messages.
365
374
@@ -425,7 +434,7 @@ def acquire_direct(self, address):
425
434
except KeyError :
426
435
connections = self .connections [address ] = deque ()
427
436
for connection in list (connections ):
428
- if connection .closed () or connection .defunct ():
437
+ if connection .closed () or connection .defunct () or connection . timedout () :
429
438
connections .remove (connection )
430
439
continue
431
440
if not connection .in_use :
@@ -600,8 +609,10 @@ def connect(address, ssl_context=None, error_handler=None, **config):
600
609
s .shutdown (SHUT_RDWR )
601
610
s .close ()
602
611
elif agreed_version == 1 :
603
- return Connection (address , s , der_encoded_server_certificate = der_encoded_server_certificate ,
612
+ connection = Connection (address , s , der_encoded_server_certificate = der_encoded_server_certificate ,
604
613
error_handler = error_handler , ** config )
614
+ connection .Init ()
615
+ return connection
605
616
elif agreed_version == 0x48545450 :
606
617
log_error ("S: [CLOSE]" )
607
618
s .close ()
0 commit comments