Skip to content

Commit f1614e1

Browse files
authored
Route message (#494)
* Updated meta version to 4.3 * Completed ROUTE message
1 parent 1b5f4c2 commit f1614e1

File tree

5 files changed

+60
-17
lines changed

5 files changed

+60
-17
lines changed

neo4j/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,9 @@ def _verify_routing_connectivity(self):
442442
routing_info = {}
443443
for ix in list(table.routers):
444444
try:
445-
routing_info[ix] = self._pool.fetch_routing_info(address=table.routers[0], timeout=self._default_workspace_config.connection_acquisition_timeout, database=self._default_workspace_config.database)
445+
routing_info[ix] = self._pool.fetch_routing_info(address=table.routers[0],
446+
database=self._default_workspace_config.database,
447+
timeout=self._default_workspace_config.connection_acquisition_timeout)
446448
except BoltHandshakeError as error:
447449
routing_info[ix] = None
448450

neo4j/io/__init__.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,14 @@ def __exit__(self, exc_type, exc_value, traceback):
294294
self.close()
295295

296296
def route(self, database):
297-
""" Fetch a routing table from the server.
297+
""" Fetch a routing table from the server for the given
298+
`database`. For Bolt 4.3 and above, this appends a ROUTE
299+
message; for earlier versions, a procedure call is made via
300+
the regular Cypher execution mechanism. In all cases, this is
301+
sent to the network, and a response is fetched.
302+
303+
:param database: database for which to fetch a routing table
304+
:return: dictionary of raw routing data
298305
"""
299306

300307
def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
@@ -666,27 +673,41 @@ def create_routing_table(self, database):
666673
if database not in self.routing_tables:
667674
self.routing_tables[database] = RoutingTable(database=database, routers=self.get_default_database_initial_router_addresses())
668675

669-
def fetch_routing_info(self, *, address, timeout, database):
676+
def fetch_routing_info(self, address, database, timeout):
670677
""" Fetch raw routing info from a given router address.
671678
672679
:param address: router address
673-
:param timeout: seconds
674-
:param database: the data base name to get routing table for
675-
:param address: the address by which the client initially contacted the server as a hint for inclusion in the returned routing table.
680+
:param database: the database name to get routing table for
681+
:param timeout: connection acquisition timeout in seconds
676682
677-
:return: list of routing records or
678-
None if no connection could be established
679-
:raise ServiceUnavailable: if the server does not support routing or
680-
if routing support is broken
683+
:return: list of routing records, or None if no connection
684+
could be established or if no readers or writers are present
685+
:raise ServiceUnavailable: if the server does not support
686+
routing, or if routing support is broken or outdated
681687
"""
682688
try:
683689
with self._acquire(address, timeout) as cx:
684-
return cx.route(database or self.workspace_config.database)
690+
routing_table = cx.route(database or self.workspace_config.database)
685691
except BoltRoutingError as error:
692+
# Connection was successful, but routing support is
693+
# broken. This may indicate that the routing procedure
694+
# does not exist (for protocol versions prior to 4.3).
695+
# This error is converted into ServiceUnavailable,
696+
# therefore surfacing to the application as a signal that
697+
# routing is broken.
698+
log.debug("Routing is broken (%s)", error)
686699
raise ServiceUnavailable(*error.args)
687-
except ServiceUnavailable:
688-
self.deactivate(address=address)
689-
return None
700+
except ServiceUnavailable as error:
701+
# The routing table request suffered a connection
702+
# failure. This should return a null routing table,
703+
# signalling to the caller to retry the request
704+
# elsewhere.
705+
log.debug("Routing is unavailable (%s)", error)
706+
routing_table = None
707+
# If the routing table is empty, deactivate the address.
708+
if not routing_table:
709+
self.deactivate(address)
710+
return routing_table
690711

691712
def fetch_routing_table(self, *, address, timeout, database):
692713
""" Fetch a routing table from a given router address.
@@ -702,7 +723,7 @@ def fetch_routing_table(self, *, address, timeout, database):
702723
:raise neo4j.exceptions.ServiceUnavailable: if no writers are available
703724
:raise neo4j._exceptions.BoltProtocolError: if the routing information received is unusable
704725
"""
705-
new_routing_info = self.fetch_routing_info(address=address, timeout=timeout, database=database)
726+
new_routing_info = self.fetch_routing_info(address, database, timeout)
706727
if new_routing_info is None:
707728
return None
708729
elif not new_routing_info:

neo4j/io/_bolt4.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,3 +507,21 @@ class Bolt4x3(Bolt4x2):
507507
"""
508508

509509
PROTOCOL_VERSION = Version(4, 3)
510+
511+
def route(self, database):
512+
513+
def fail(md):
514+
from neo4j._exceptions import BoltRoutingError
515+
if md.get("code") == "Neo.ClientError.Procedure.ProcedureNotFound":
516+
raise BoltRoutingError("Server does not support routing", self.unresolved_address)
517+
else:
518+
raise BoltRoutingError("Routing support broken on server", self.unresolved_address)
519+
520+
routing_context = self.routing_context or {}
521+
log.debug("[#%04X] C: ROUTE %r %r", self.local_port, routing_context, database)
522+
metadata = {}
523+
self._append(b"\x66", (routing_context, database),
524+
response=Response(self, on_success=metadata.update, on_failure=fail))
525+
self.send_all()
526+
self.fetch_all()
527+
return [metadata.get("rt")]

neo4j/meta.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
# Can be automatically overridden in builds
2323
package = "neo4j"
24-
version = "4.2.dev0"
24+
version = "4.3.dev0"
2525

2626

2727
def get_user_agent():

testkitbackend/backend.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ def _process(self, request):
7676
self.errors[key] = e
7777
self.send_response("DriverError", {"id": key})
7878
except Exception as e:
79-
self.send_response("BackendError", {"msg": str(e)})
79+
from traceback import print_exception
80+
print_exception(type(e), e, e.__traceback__)
81+
self.send_response("BackendError", {"msg": "%s: %s" % (e.__class__.__name__, e)})
8082

8183
def send_response(self, name, data):
8284
""" Sends a response to backend.

0 commit comments

Comments
 (0)