48
48
timeout as SocketTimeout ,
49
49
)
50
50
from ssl import (
51
+ CertificateError ,
51
52
HAS_SNI ,
52
53
SSLError ,
53
54
)
59
60
from time import perf_counter
60
61
61
62
from neo4j ._exceptions import (
63
+ BoltError ,
62
64
BoltHandshakeError ,
63
65
BoltProtocolError ,
64
66
BoltRoutingError ,
77
79
from neo4j .exceptions import (
78
80
ClientError ,
79
81
ConfigurationError ,
82
+ DriverError ,
80
83
ReadServiceUnavailable ,
81
84
ServiceUnavailable ,
82
85
SessionExpired ,
@@ -708,16 +711,15 @@ def fetch_routing_table(self, *, address, timeout, database, bookmarks):
708
711
709
712
:return: a new RoutingTable instance or None if the given router is
710
713
currently unable to provide routing information
711
-
712
- :raise neo4j.exceptions.ServiceUnavailable: if no writers are available
713
- :raise neo4j._exceptions.BoltProtocolError: if the routing information received is unusable
714
714
"""
715
- new_routing_info = self .fetch_routing_info (address , database , bookmarks ,
716
- timeout )
717
- if new_routing_info is None :
715
+ try :
716
+ new_routing_info = self .fetch_routing_info (address , database ,
717
+ bookmarks , timeout )
718
+ except ServiceUnavailable :
719
+ new_routing_info = None
720
+ if not new_routing_info :
721
+ log .debug ("Failed to fetch routing info %s" , address )
718
722
return None
719
- elif not new_routing_info :
720
- raise BoltRoutingError ("Invalid routing table" , address )
721
723
else :
722
724
servers = new_routing_info [0 ]["servers" ]
723
725
ttl = new_routing_info [0 ]["ttl" ]
@@ -733,11 +735,13 @@ def fetch_routing_table(self, *, address, timeout, database, bookmarks):
733
735
734
736
# No routers
735
737
if num_routers == 0 :
736
- raise BoltRoutingError ("No routing servers returned from server" , address )
738
+ log .debug ("No routing servers returned from server %s" , address )
739
+ return None
737
740
738
741
# No readers
739
742
if num_readers == 0 :
740
- raise BoltRoutingError ("No read servers returned from server" , address )
743
+ log .debug ("No read servers returned from server %s" , address )
744
+ return None
741
745
742
746
# At least one of each is fine, so return this table
743
747
return new_routing_table
@@ -751,14 +755,20 @@ def update_routing_table_from(self, *routers, database=None,
751
755
"""
752
756
log .debug ("Attempting to update routing table from {}" .format (", " .join (map (repr , routers ))))
753
757
for router in routers :
754
- new_routing_table = self .fetch_routing_table (
755
- address = router , timeout = self .pool_config .connection_timeout ,
756
- database = database , bookmarks = bookmarks
757
- )
758
- if new_routing_table is not None :
759
- self .routing_tables [database ].update (new_routing_table )
760
- log .debug ("[#0000] C: <UPDATE ROUTING TABLE> address={!r} ({!r})" .format (router , self .routing_tables [database ]))
761
- return True
758
+ for address in router .resolve (resolver = self .pool_config .resolver ):
759
+ new_routing_table = self .fetch_routing_table (
760
+ address = address ,
761
+ timeout = self .pool_config .connection_timeout ,
762
+ database = database , bookmarks = bookmarks
763
+ )
764
+ if new_routing_table is not None :
765
+ self .routing_tables [database ].update (new_routing_table )
766
+ log .debug (
767
+ "[#0000] C: <UPDATE ROUTING TABLE> address=%r (%r)" ,
768
+ address , self .routing_tables [database ]
769
+ )
770
+ return True
771
+ self .deactivate (router )
762
772
return False
763
773
764
774
def update_routing_table (self , * , database , bookmarks ):
@@ -771,24 +781,26 @@ def update_routing_table(self, *, database, bookmarks):
771
781
:raise neo4j.exceptions.ServiceUnavailable:
772
782
"""
773
783
# copied because it can be modified
774
- existing_routers = list (self .routing_tables [database ].routers )
784
+ existing_routers = set (self .routing_tables [database ].routers )
785
+
786
+ prefer_initial_routing_address = \
787
+ self .routing_tables [database ].missing_fresh_writer ()
775
788
776
- has_tried_initial_routers = False
777
- if self .routing_tables [database ].missing_fresh_writer ():
789
+ if prefer_initial_routing_address :
778
790
# TODO: Test this state
779
- has_tried_initial_routers = True
780
791
if self .update_routing_table_from (
781
792
self .first_initial_routing_address , database = database ,
782
793
bookmarks = bookmarks
783
794
):
784
795
# Why is only the first initial routing address used?
785
796
return
786
- if self .update_routing_table_from (* existing_routers , database = database ,
787
- bookmarks = bookmarks ):
797
+ if self .update_routing_table_from (
798
+ * (existing_routers - {self .first_initial_routing_address }),
799
+ database = database , bookmarks = bookmarks
800
+ ):
788
801
return
789
802
790
- if (not has_tried_initial_routers
791
- and self .first_initial_routing_address not in existing_routers ):
803
+ if not prefer_initial_routing_address :
792
804
if self .update_routing_table_from (
793
805
self .first_initial_routing_address , database = database ,
794
806
bookmarks = bookmarks
@@ -956,21 +968,24 @@ def _secure(s, host, ssl_context):
956
968
local_port = s .getsockname ()[1 ]
957
969
# Secure the connection if an SSL context has been provided
958
970
if ssl_context :
971
+ last_error = None
959
972
log .debug ("[#%04X] C: <SECURE> %s" , local_port , host )
960
973
try :
961
974
sni_host = host if HAS_SNI and host else None
962
975
s = ssl_context .wrap_socket (s , server_hostname = sni_host )
963
- except (SSLError , OSError ) as cause :
964
- _close_socket (s )
965
- error = BoltSecurityError (message = "Failed to establish encrypted connection." , address = (host , local_port ))
966
- error .__cause__ = cause
967
- raise error
968
- else :
969
- # Check that the server provides a certificate
970
- der_encoded_server_certificate = s .getpeercert (binary_form = True )
971
- if der_encoded_server_certificate is None :
972
- s .close ()
973
- raise BoltProtocolError ("When using an encrypted socket, the server should always provide a certificate" , address = (host , local_port ))
976
+ except (OSError , SSLError , CertificateError ) as cause :
977
+ raise BoltSecurityError (
978
+ message = "Failed to establish encrypted connection." ,
979
+ address = (host , local_port )
980
+ ) from cause
981
+ # Check that the server provides a certificate
982
+ der_encoded_server_certificate = s .getpeercert (binary_form = True )
983
+ if der_encoded_server_certificate is None :
984
+ raise BoltProtocolError (
985
+ "When using an encrypted socket, the server should always "
986
+ "provide a certificate" , address = (host , local_port )
987
+ )
988
+ return s
974
989
return s
975
990
976
991
@@ -1041,27 +1056,38 @@ def connect(address, *, timeout, custom_resolver, ssl_context, keep_alive):
1041
1056
""" Connect and perform a handshake and return a valid Connection object,
1042
1057
assuming a protocol version can be agreed.
1043
1058
"""
1044
- last_error = None
1059
+ errors = []
1045
1060
# Establish a connection to the host and port specified
1046
1061
# Catches refused connections see:
1047
1062
# https://docs.python.org/2/library/errno.html
1048
- log .debug ("[#0000] C: <RESOLVE> %s" , address )
1049
1063
1050
- for resolved_address in Address (address ).resolve (resolver = custom_resolver ):
1064
+ resolved_addresses = Address (address ).resolve (resolver = custom_resolver )
1065
+ for resolved_address in resolved_addresses :
1051
1066
s = None
1052
1067
try :
1053
- host = address [0 ]
1054
1068
s = _connect (resolved_address , timeout , keep_alive )
1055
- s = _secure (s , host , ssl_context )
1056
- return _handshake (s , address )
1057
- except Exception as error :
1069
+ s = _secure (s , resolved_address . host_name , ssl_context )
1070
+ return _handshake (s , resolved_address )
1071
+ except ( BoltError , DriverError , OSError ) as error :
1058
1072
if s :
1059
1073
_close_socket (s )
1060
- last_error = error
1061
- if last_error is None :
1062
- raise ServiceUnavailable ("Failed to resolve addresses for %s" % address )
1074
+ errors .append (error )
1075
+ except Exception :
1076
+ if s :
1077
+ _close_socket (s )
1078
+ raise
1079
+ if not errors :
1080
+ raise ServiceUnavailable (
1081
+ "Couldn't connect to %s (resolved to %s)" % (
1082
+ str (address ), tuple (map (str , resolved_addresses )))
1083
+ )
1063
1084
else :
1064
- raise last_error
1085
+ raise ServiceUnavailable (
1086
+ "Couldn't connect to %s (resolved to %s):\n %s" % (
1087
+ str (address ), tuple (map (str , resolved_addresses )),
1088
+ "\n " .join (map (str , errors ))
1089
+ )
1090
+ ) from errors [0 ]
1065
1091
1066
1092
1067
1093
def check_supported_server_product (agent ):
0 commit comments