Skip to content

Commit ccfd4ab

Browse files
authored
Send bookmarks when fetching a routing table (#522)
1 parent 53cbd08 commit ccfd4ab

File tree

11 files changed

+115
-45
lines changed

11 files changed

+115
-45
lines changed

neo4j/__init__.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -442,18 +442,17 @@ def _verify_routing_connectivity(self):
442442
routing_info = {}
443443
for ix in list(table.routers):
444444
try:
445-
routing_info[ix] = self._pool.fetch_routing_info(address=table.routers[0],
446-
database=self._default_workspace_config.database,
447-
timeout=self._default_workspace_config.connection_acquisition_timeout)
445+
routing_info[ix] = self._pool.fetch_routing_info(
446+
address=table.routers[0],
447+
database=self._default_workspace_config.database,
448+
bookmarks=None,
449+
timeout=self._default_workspace_config
450+
.connection_acquisition_timeout
451+
)
448452
except BoltHandshakeError as error:
449453
routing_info[ix] = None
450454

451455
for key, val in routing_info.items():
452456
if val is not None:
453457
return routing_info
454458
raise ServiceUnavailable("Could not connect to any routing servers.")
455-
456-
def update_routing_table(self, database=None):
457-
if database is None:
458-
database = self._pool.workspace_config.database
459-
self._pool.update_routing_table(database=database)

neo4j/io/__init__.py

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -293,14 +293,16 @@ def __enter__(self):
293293
def __exit__(self, exc_type, exc_value, traceback):
294294
self.close()
295295

296-
def route(self, database):
296+
def route(self, database=None, bookmarks=None):
297297
""" Fetch a routing table from the server for the given
298298
`database`. For Bolt 4.3 and above, this appends a ROUTE
299299
message; for earlier versions, a procedure call is made via
300300
the regular Cypher execution mechanism. In all cases, this is
301301
sent to the network, and a response is fetched.
302302
303303
:param database: database for which to fetch a routing table
304+
:param bookmarks: iterable of bookmark values after which this
305+
transaction should begin
304306
:return: dictionary of raw routing data
305307
"""
306308

@@ -480,12 +482,14 @@ def time_remaining():
480482
raise ClientError("Failed to obtain a connection from pool "
481483
"within {!r}s".format(timeout))
482484

483-
def acquire(self, access_mode=None, timeout=None, database=None):
485+
def acquire(self, access_mode=None, timeout=None, database=None,
486+
bookmarks=None):
484487
""" Acquire a connection to a server that can satisfy a set of parameters.
485488
486489
:param access_mode:
487490
:param timeout:
488491
:param database:
492+
:param bookmarks:
489493
"""
490494

491495
def release(self, *connections):
@@ -587,7 +591,8 @@ def __init__(self, opener, pool_config, workspace_config, routing_context, addre
587591
def __repr__(self):
588592
return "<{} address={!r}>".format(self.__class__.__name__, self.address)
589593

590-
def acquire(self, access_mode=None, timeout=None, database=None):
594+
def acquire(self, access_mode=None, timeout=None, database=None,
595+
bookmarks=None):
591596
# The access_mode and database is not needed for a direct connection, its just there for consistency.
592597
return self._acquire(self.address, timeout)
593598

@@ -673,11 +678,13 @@ def create_routing_table(self, database):
673678
if database not in self.routing_tables:
674679
self.routing_tables[database] = RoutingTable(database=database, routers=self.get_default_database_initial_router_addresses())
675680

676-
def fetch_routing_info(self, address, database, timeout):
681+
def fetch_routing_info(self, address, database, bookmarks, timeout):
677682
""" Fetch raw routing info from a given router address.
678683
679684
:param address: router address
680685
:param database: the database name to get routing table for
686+
:param bookmarks: iterable of bookmark values after which the routing
687+
info should be fetched
681688
:param timeout: connection acquisition timeout in seconds
682689
683690
:return: list of routing records, or None if no connection
@@ -687,7 +694,9 @@ def fetch_routing_info(self, address, database, timeout):
687694
"""
688695
try:
689696
with self._acquire(address, timeout) as cx:
690-
routing_table = cx.route(database or self.workspace_config.database)
697+
routing_table = cx.route(
698+
database or self.workspace_config.database, bookmarks
699+
)
691700
except BoltRoutingError as error:
692701
# Connection was successful, but routing support is
693702
# broken. This may indicate that the routing procedure
@@ -709,21 +718,23 @@ def fetch_routing_info(self, address, database, timeout):
709718
self.deactivate(address)
710719
return routing_table
711720

712-
def fetch_routing_table(self, *, address, timeout, database):
721+
def fetch_routing_table(self, *, address, timeout, database, bookmarks):
713722
""" Fetch a routing table from a given router address.
714723
715724
:param address: router address
716725
:param timeout: seconds
717726
:param database: the database name
718727
:type: str
728+
:param bookmarks: bookmarks used when fetching routing table
719729
720730
:return: a new RoutingTable instance or None if the given router is
721731
currently unable to provide routing information
722732
723733
:raise neo4j.exceptions.ServiceUnavailable: if no writers are available
724734
:raise neo4j._exceptions.BoltProtocolError: if the routing information received is unusable
725735
"""
726-
new_routing_info = self.fetch_routing_info(address, database, timeout)
736+
new_routing_info = self.fetch_routing_info(address, database, bookmarks,
737+
timeout)
727738
if new_routing_info is None:
728739
return None
729740
elif not new_routing_info:
@@ -752,26 +763,31 @@ def fetch_routing_table(self, *, address, timeout, database):
752763
# At least one of each is fine, so return this table
753764
return new_routing_table
754765

755-
def update_routing_table_from(self, *routers, database=None):
766+
def update_routing_table_from(self, *routers, database=None,
767+
bookmarks=None):
756768
""" Try to update routing tables with the given routers.
757769
758770
:return: True if the routing table is successfully updated,
759771
otherwise False
760772
"""
761773
log.debug("Attempting to update routing table from {}".format(", ".join(map(repr, routers))))
762774
for router in routers:
763-
new_routing_table = self.fetch_routing_table(address=router, timeout=self.pool_config.connection_timeout, database=database)
775+
new_routing_table = self.fetch_routing_table(
776+
address=router, timeout=self.pool_config.connection_timeout,
777+
database=database, bookmarks=bookmarks
778+
)
764779
if new_routing_table is not None:
765780
self.routing_tables[database].update(new_routing_table)
766781
log.debug("[#0000] C: <UPDATE ROUTING TABLE> address={!r} ({!r})".format(router, self.routing_tables[database]))
767782
return True
768783
return False
769784

770-
def update_routing_table(self, *, database):
785+
def update_routing_table(self, *, database, bookmarks):
771786
""" Update the routing table from the first router able to provide
772787
valid routing information.
773788
774789
:param database: The database name
790+
:param bookmarks: bookmarks used when fetching routing table
775791
776792
:raise neo4j.exceptions.ServiceUnavailable:
777793
"""
@@ -782,15 +798,22 @@ def update_routing_table(self, *, database):
782798
if self.routing_tables[database].missing_fresh_writer():
783799
# TODO: Test this state
784800
has_tried_initial_routers = True
785-
if self.update_routing_table_from(self.first_initial_routing_address, database=database):
801+
if self.update_routing_table_from(
802+
self.first_initial_routing_address, database=database,
803+
bookmarks=bookmarks
804+
):
786805
# Why is only the first initial routing address used?
787806
return
788-
789-
if self.update_routing_table_from(*existing_routers, database=database):
807+
if self.update_routing_table_from(*existing_routers, database=database,
808+
bookmarks=bookmarks):
790809
return
791810

792-
if not has_tried_initial_routers and self.first_initial_routing_address not in existing_routers:
793-
if self.update_routing_table_from(self.first_initial_routing_address, database=database):
811+
if (not has_tried_initial_routers
812+
and self.first_initial_routing_address not in existing_routers):
813+
if self.update_routing_table_from(
814+
self.first_initial_routing_address, database=database,
815+
bookmarks=bookmarks
816+
):
794817
# Why is only the first initial routing address used?
795818
return
796819

@@ -804,7 +827,8 @@ def update_connection_pool(self, *, database):
804827
if address not in servers:
805828
super(Neo4jPool, self).deactivate(address)
806829

807-
def ensure_routing_table_is_fresh(self, *, access_mode, database):
830+
def ensure_routing_table_is_fresh(self, *, access_mode, database,
831+
bookmarks):
808832
""" Update the routing table if stale.
809833
810834
This method performs two freshness checks, before and after acquiring
@@ -823,7 +847,7 @@ def ensure_routing_table_is_fresh(self, *, access_mode, database):
823847
return False
824848
with self.refresh_lock:
825849

826-
self.update_routing_table(database=database)
850+
self.update_routing_table(database=database, bookmarks=bookmarks)
827851
self.update_connection_pool(database=database)
828852

829853
for database in list(self.routing_tables.keys()):
@@ -835,12 +859,14 @@ def ensure_routing_table_is_fresh(self, *, access_mode, database):
835859

836860
return True
837861

838-
def _select_address(self, *, access_mode, database):
862+
def _select_address(self, *, access_mode, database, bookmarks):
839863
from neo4j.api import READ_ACCESS
840864
""" Selects the address with the fewest in-use connections.
841865
"""
842866
self.create_routing_table(database)
843-
self.ensure_routing_table_is_fresh(access_mode=access_mode, database=database)
867+
self.ensure_routing_table_is_fresh(
868+
access_mode=access_mode, database=database, bookmarks=bookmarks
869+
)
844870
log.debug("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r", self.routing_tables)
845871
if access_mode == READ_ACCESS:
846872
addresses = self.routing_tables[database].readers
@@ -856,7 +882,8 @@ def _select_address(self, *, access_mode, database):
856882
raise WriteServiceUnavailable("No write service currently available")
857883
return choice(addresses_by_usage[min(addresses_by_usage)])
858884

859-
def acquire(self, access_mode=None, timeout=None, database=None):
885+
def acquire(self, access_mode=None, timeout=None, database=None,
886+
bookmarks=None):
860887
if access_mode not in (WRITE_ACCESS, READ_ACCESS):
861888
raise ClientError("Non valid 'access_mode'; {}".format(access_mode))
862889
if not timeout:
@@ -867,7 +894,10 @@ def acquire(self, access_mode=None, timeout=None, database=None):
867894
while True:
868895
try:
869896
# Get an address for a connection that have the fewest in-use connections.
870-
address = self._select_address(access_mode=access_mode, database=database)
897+
address = self._select_address(
898+
access_mode=access_mode, database=database,
899+
bookmarks=bookmarks
900+
)
871901
log.debug("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r", database, address)
872902
except (ReadServiceUnavailable, WriteServiceUnavailable) as err:
873903
raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err

neo4j/io/_bolt3.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def hello(self):
159159
self.fetch_all()
160160
check_supported_server_product(self.server_info.agent)
161161

162-
def route(self, database):
162+
def route(self, database=None, bookmarks=None):
163163
if database is not None: # default database
164164
raise ConfigurationError("Database name parameter for selecting database is not "
165165
"supported in Bolt Protocol {!r}. Database name {!r}. "
@@ -176,6 +176,9 @@ def fail(md):
176176
else:
177177
raise BoltRoutingError("Routing support broken on server", self.unresolved_address)
178178

179+
# Ignoring database and bookmarks because there is no multi-db support.
180+
# The bookmarks are only relevant for making sure a previously created
181+
# db exists before querying a routing table for it.
179182
self.run(
180183
"CALL dbms.cluster.routing.getRoutingTable($context)", # This is an internal procedure call. Only available if the Neo4j 3.5 is setup with clustering.
181184
{"context": self.routing_context},

neo4j/io/_bolt4.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def hello(self):
158158
self.fetch_all()
159159
check_supported_server_product(self.server_info.agent)
160160

161-
def route(self, database):
161+
def route(self, database=None, bookmarks=None):
162162
metadata = {}
163163
records = []
164164

@@ -177,6 +177,7 @@ def fail(md):
177177
"CALL dbms.routing.getRoutingTable($context)",
178178
{"context": self.routing_context},
179179
mode="r",
180+
bookmarks=bookmarks,
180181
db=SYSTEM_DATABASE,
181182
on_success=metadata.update, on_failure=fail
182183
)
@@ -185,6 +186,7 @@ def fail(md):
185186
"CALL dbms.routing.getRoutingTable($context, $database)",
186187
{"context": self.routing_context, "database": database},
187188
mode="r",
189+
bookmarks=bookmarks,
188190
db=SYSTEM_DATABASE,
189191
on_success=metadata.update, on_failure=fail
190192
)
@@ -511,7 +513,7 @@ class Bolt4x3(Bolt4x2):
511513

512514
PROTOCOL_VERSION = Version(4, 3)
513515

514-
def route(self, database):
516+
def route(self, database=None, bookmarks=None):
515517

516518
def fail(md):
517519
from neo4j._exceptions import BoltRoutingError
@@ -526,7 +528,11 @@ def fail(md):
526528
routing_context = self.routing_context or {}
527529
log.debug("[#%04X] C: ROUTE %r %r", self.local_port, routing_context, database)
528530
metadata = {}
529-
self._append(b"\x66", (routing_context, database),
531+
if bookmarks is None:
532+
bookmarks = []
533+
else:
534+
bookmarks = list(bookmarks)
535+
self._append(b"\x66", (routing_context, bookmarks, database),
530536
response=Response(self, on_success=metadata.update, on_failure=fail))
531537
self.send_all()
532538
self.fetch_all()

neo4j/work/simple.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,12 @@ def _connect(self, access_mode, database):
113113
self._connection.send_all()
114114
self._connection.fetch_all()
115115
self._disconnect()
116-
self._connection = self._pool.acquire(access_mode=access_mode, timeout=self._config.connection_acquisition_timeout, database=database)
116+
self._connection = self._pool.acquire(
117+
access_mode=access_mode,
118+
timeout=self._config.connection_acquisition_timeout,
119+
database=database,
120+
bookmarks=self._bookmarks
121+
)
117122

118123
def _disconnect(self):
119124
if self._connection:

tests/integration/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def bolt_driver(target, auth):
299299
def neo4j_driver(target, auth):
300300
try:
301301
driver = GraphDatabase.neo4j_driver(target, auth=auth)
302-
driver.update_routing_table()
302+
driver._pool.update_routing_table(database=None, bookmarks=None)
303303
except ServiceUnavailable as error:
304304
if isinstance(error.__cause__, BoltHandshakeError):
305305
pytest.skip(error.args[0])

tests/integration/test_autocommit.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ def test_can_run_simple_statement_with_params(session):
5555
assert count == 1
5656

5757

58-
@pytest.mark.skip(reason="BOOKMARK, AttributeError: 'Session' object has no attribute 'last_bookmark'")
5958
def test_autocommit_transactions_use_bookmarks(neo4j_driver):
6059
bookmarks = []
6160
# Generate an initial bookmark
@@ -66,7 +65,7 @@ def test_autocommit_transactions_use_bookmarks(neo4j_driver):
6665
bookmarks.append(bookmark)
6766
# Propagate into another session
6867
with neo4j_driver.session(bookmarks=bookmarks) as session:
69-
assert list(session.next_bookmarks()) == bookmarks
68+
assert list(session._bookmarks) == bookmarks
7069
session.run("CREATE ()").consume()
7170
bookmark = session.last_bookmark()
7271
assert bookmark is not None
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
!: BOLT 4
2+
!: AUTO HELLO
3+
!: AUTO GOODBYE
4+
!: AUTO RESET
5+
!: PORT 9001
6+
7+
C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {"address": "localhost:9001"}} {"mode": "r", "db": "system", "bookmarks": ["bookmark:1"]}
8+
PULL {"n": -1}
9+
S: SUCCESS {"fields": ["ttl", "servers"]}
10+
RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]]
11+
SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "s", "t_last": 15, "db": "system"}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
!: BOLT 4
2+
!: AUTO HELLO
3+
!: AUTO GOODBYE
4+
!: AUTO RESET
5+
!: PORT 9001
6+
7+
C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {"address": "localhost:9001"}} {"mode": "r", "db": "system", "bookmarks": ["bookmark:0", "bookmark:1"]}
8+
PULL {"n": -1}
9+
S: SUCCESS {"fields": ["ttl", "servers"]}
10+
RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]]
11+
SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "s", "t_last": 15, "db": "system"}

tests/stub/test_bookmarking.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,10 @@ def test_should_be_able_to_set_multiple_bookmarks(driver_info, test_script):
8383
"test_scripts",
8484
[
8585
("v3/router.script", "v3/bookmark_chain.script"),
86-
("v4x0/router.script", "v4x0/tx_bookmark_chain.script"),
86+
(
87+
"v4x0/router_with_two_bookmarks.script",
88+
"v4x0/tx_bookmark_chain.script"
89+
),
8790
]
8891
)
8992
def test_should_automatically_chain_bookmarks(driver_info, test_scripts):
@@ -104,7 +107,10 @@ def test_should_automatically_chain_bookmarks(driver_info, test_scripts):
104107
"test_scripts",
105108
[
106109
("v3/router.script", "v3/bookmark_chain_with_autocommit.script"),
107-
("v4x0/router.script", "v4x0/tx_bookmark_chain_with_autocommit.script"),
110+
(
111+
"v4x0/router_with_one_bookmark.script",
112+
"v4x0/tx_bookmark_chain_with_autocommit.script"
113+
),
108114
]
109115
)
110116
def test_autocommit_transaction_included_in_chain(driver_info, test_scripts):

0 commit comments

Comments
 (0)