Skip to content

Commit 8b77616

Browse files
authored
Merge pull request #155 from zhenlineo/1.2-roll-back-initial-addr
Roll back to initial server if running out of routers
2 parents 20cbad8 + e85ad48 commit 8b77616

File tree

3 files changed

+70
-37
lines changed

3 files changed

+70
-37
lines changed

neo4j/v1/routing.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,9 @@ class RoutingConnectionPool(ConnectionPool):
154154

155155
routing_info_procedure = "dbms.cluster.routing.getServers"
156156

157-
def __init__(self, connector, *routers):
157+
def __init__(self, connector, initial_address, *routers):
158158
super(RoutingConnectionPool, self).__init__(connector)
159+
self.initial_address = initial_address
159160
self.routing_table = RoutingTable(routers)
160161
self.refresh_lock = Lock()
161162

@@ -216,16 +217,32 @@ def fetch_routing_table(self, address):
216217
# At least one of each is fine, so return this table
217218
return new_routing_table
218219

220+
def update_routing_table_with_routers(self, routers):
221+
"""Try to update routing tables with the given routers
222+
:return: True if the routing table is successfully updated, otherwise False
223+
"""
224+
for router in routers:
225+
new_routing_table = self.fetch_routing_table(router)
226+
if new_routing_table is not None:
227+
self.routing_table.update(new_routing_table)
228+
return True
229+
return False
230+
219231
def update_routing_table(self):
220232
""" Update the routing table from the first router able to provide
221233
valid routing information.
222234
"""
223235
# copied because it can be modified
224236
copy_of_routers = list(self.routing_table.routers)
237+
if self.update_routing_table_with_routers(copy_of_routers):
238+
return
239+
240+
initial_routers = resolve(self.initial_address)
225241
for router in copy_of_routers:
226-
new_routing_table = self.fetch_routing_table(router)
227-
if new_routing_table is not None:
228-
self.routing_table.update(new_routing_table)
242+
if router in initial_routers:
243+
initial_routers.remove(router)
244+
if initial_routers:
245+
if self.update_routing_table_with_routers(initial_routers):
229246
return
230247

231248
# None of the routers have been successful, so just fail
@@ -304,7 +321,7 @@ def __init__(self, uri, **config):
304321
def connector(a):
305322
return connect(a, security_plan.ssl_context, **config)
306323

307-
pool = RoutingConnectionPool(connector, *resolve(initial_address))
324+
pool = RoutingConnectionPool(connector, initial_address, *resolve(initial_address))
308325
try:
309326
pool.update_routing_table()
310327
except:

test/stub/test_routing.py

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
"X": 1,
4949
}
5050

51+
UNREACHABLE_ADDRESS = ("127.0.0.1", 8080)
5152

5253
def connector(address):
5354
return connect(address, auth=basic_auth("neotest", "neotest"))
@@ -58,7 +59,7 @@ class RoutingConnectionPoolFetchRoutingInfoTestCase(StubTestCase):
5859
def test_should_get_info_from_router(self):
5960
with StubCluster({9001: "router.script"}):
6061
address = ("127.0.0.1", 9001)
61-
with RoutingConnectionPool(connector) as pool:
62+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS) as pool:
6263
result = pool.fetch_routing_info(address)
6364
assert len(result) == 1
6465
record = result[0]
@@ -72,58 +73,58 @@ def test_should_get_info_from_router(self):
7273

7374
def test_should_remove_router_if_cannot_connect(self):
7475
address = ("127.0.0.1", 9001)
75-
with RoutingConnectionPool(connector, address) as pool:
76+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
7677
assert address in pool.routing_table.routers
7778
_ = pool.fetch_routing_info(address)
7879
assert address not in pool.routing_table.routers
7980

8081
def test_should_remove_router_if_connection_drops(self):
8182
with StubCluster({9001: "rude_router.script"}):
8283
address = ("127.0.0.1", 9001)
83-
with RoutingConnectionPool(connector, address) as pool:
84+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
8485
assert address in pool.routing_table.routers
8586
_ = pool.fetch_routing_info(address)
8687
assert address not in pool.routing_table.routers
8788

8889
def test_should_not_fail_if_cannot_connect_but_router_already_removed(self):
8990
address = ("127.0.0.1", 9001)
90-
with RoutingConnectionPool(connector) as pool:
91+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS) as pool:
9192
assert address not in pool.routing_table.routers
9293
_ = pool.fetch_routing_info(address)
9394
assert address not in pool.routing_table.routers
9495

9596
def test_should_not_fail_if_connection_drops_but_router_already_removed(self):
9697
with StubCluster({9001: "rude_router.script"}):
9798
address = ("127.0.0.1", 9001)
98-
with RoutingConnectionPool(connector) as pool:
99+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS) as pool:
99100
assert address not in pool.routing_table.routers
100101
_ = pool.fetch_routing_info(address)
101102
assert address not in pool.routing_table.routers
102103

103104
def test_should_return_none_if_cannot_connect(self):
104105
address = ("127.0.0.1", 9001)
105-
with RoutingConnectionPool(connector, address) as pool:
106+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
106107
result = pool.fetch_routing_info(address)
107108
assert result is None
108109

109110
def test_should_return_none_if_connection_drops(self):
110111
with StubCluster({9001: "rude_router.script"}):
111112
address = ("127.0.0.1", 9001)
112-
with RoutingConnectionPool(connector, address) as pool:
113+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
113114
result = pool.fetch_routing_info(address)
114115
assert result is None
115116

116117
def test_should_fail_for_non_router(self):
117118
with StubCluster({9001: "non_router.script"}):
118119
address = ("127.0.0.1", 9001)
119-
with RoutingConnectionPool(connector, address) as pool:
120+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
120121
with self.assertRaises(ServiceUnavailable):
121122
_ = pool.fetch_routing_info(address)
122123

123124
def test_should_fail_if_database_error(self):
124125
with StubCluster({9001: "broken_router.script"}):
125126
address = ("127.0.0.1", 9001)
126-
with RoutingConnectionPool(connector, address) as pool:
127+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
127128
with self.assertRaises(ServiceUnavailable):
128129
_ = pool.fetch_routing_info(address)
129130

@@ -133,7 +134,7 @@ class RoutingConnectionPoolFetchRoutingTableTestCase(StubTestCase):
133134
def test_should_get_table_from_router(self):
134135
with StubCluster({9001: "router.script"}):
135136
address = ("127.0.0.1", 9001)
136-
with RoutingConnectionPool(connector) as pool:
137+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS) as pool:
137138
table = pool.fetch_routing_table(address)
138139
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002),
139140
("127.0.0.1", 9003)}
@@ -143,28 +144,28 @@ def test_should_get_table_from_router(self):
143144

144145
def test_null_info_should_return_null_table(self):
145146
address = ("127.0.0.1", 9001)
146-
with RoutingConnectionPool(connector) as pool:
147+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS) as pool:
147148
table = pool.fetch_routing_table(address)
148149
assert table is None
149150

150151
def test_no_routers_should_raise_protocol_error(self):
151152
with StubCluster({9001: "router_no_routers.script"}):
152153
address = ("127.0.0.1", 9001)
153-
with RoutingConnectionPool(connector) as pool:
154+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS) as pool:
154155
with self.assertRaises(ProtocolError):
155156
_ = pool.fetch_routing_table(address)
156157

157158
def test_no_readers_should_raise_protocol_error(self):
158159
with StubCluster({9001: "router_no_readers.script"}):
159160
address = ("127.0.0.1", 9001)
160-
with RoutingConnectionPool(connector) as pool:
161+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS) as pool:
161162
with self.assertRaises(ProtocolError):
162163
_ = pool.fetch_routing_table(address)
163164

164165
def test_no_writers_should_return_null_table(self):
165166
with StubCluster({9001: "router_no_writers.script"}):
166167
address = ("127.0.0.1", 9001)
167-
with RoutingConnectionPool(connector) as pool:
168+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS) as pool:
168169
table = pool.fetch_routing_table(address)
169170
assert table is None
170171

@@ -183,8 +184,21 @@ class RoutingConnectionPoolUpdateRoutingTableTestCase(StubTestCase):
183184
(None, None, ServiceUnavailable): ServiceUnavailable,
184185
}
185186

187+
def test_roll_back_to_initial_server_if_failed_update_with_existing_routers(self):
188+
with StubCluster({9001: "router.script"}):
189+
initial_address =("127.0.0.1", 9001) # roll back addresses
190+
routers = [("127.0.0.1", 9002), ("127.0.0.1", 9003)] # not reachable servers
191+
with RoutingConnectionPool(connector, initial_address, *routers) as pool:
192+
pool.update_routing_table()
193+
table = pool.routing_table
194+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002),
195+
("127.0.0.1", 9003)}
196+
assert table.readers == {("127.0.0.1", 9004), ("127.0.0.1", 9005)}
197+
assert table.writers == {("127.0.0.1", 9006)}
198+
assert table.ttl == 300
199+
186200
def test_update_with_no_routers_should_signal_service_unavailable(self):
187-
with RoutingConnectionPool(connector) as pool:
201+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS) as pool:
188202
with self.assertRaises(ServiceUnavailable):
189203
pool.update_routing_table()
190204

@@ -207,7 +221,7 @@ def _test_server_outcome(self, server_outcomes, overall_outcome):
207221
assert False, "Unexpected server outcome %r" % outcome
208222
routers.append(("127.0.0.1", port))
209223
with StubCluster(servers):
210-
with RoutingConnectionPool(connector, *routers) as pool:
224+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, *routers) as pool:
211225
if overall_outcome is RoutingTable:
212226
pool.update_routing_table()
213227
table = pool.routing_table
@@ -228,7 +242,7 @@ class RoutingConnectionPoolRefreshRoutingTableTestCase(StubTestCase):
228242
def test_should_update_if_stale(self):
229243
with StubCluster({9001: "router.script"}):
230244
address = ("127.0.0.1", 9001)
231-
with RoutingConnectionPool(connector, address) as pool:
245+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
232246
first_updated_time = pool.routing_table.last_updated_time
233247
pool.routing_table.ttl = 0
234248
pool.refresh_routing_table()
@@ -238,7 +252,7 @@ def test_should_update_if_stale(self):
238252
def test_should_not_update_if_fresh(self):
239253
with StubCluster({9001: "router.script"}):
240254
address = ("127.0.0.1", 9001)
241-
with RoutingConnectionPool(connector, address) as pool:
255+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
242256
pool.refresh_routing_table()
243257
first_updated_time = pool.routing_table.last_updated_time
244258
pool.refresh_routing_table()
@@ -250,7 +264,7 @@ def test_should_not_update_if_fresh(self):
250264
# address = ("127.0.0.1", 9001)
251265
# table = RoutingTable.parse_routing_info([VALID_ROUTING_RECORD])
252266
#
253-
# with RoutingConnectionPool(connector, address) as pool:
267+
# with RoutingConnectionPool(connector, not_reachable_address, address) as pool:
254268
# semaphore = Semaphore()
255269
#
256270
# class Refresher(Thread):
@@ -297,7 +311,7 @@ def test_should_not_update_if_fresh(self):
297311
# address = ("127.0.0.1", 9001)
298312
# table = RoutingTable.parse_routing_info([VALID_ROUTING_RECORD])
299313
#
300-
# with RoutingConnectionPool(connector, address) as pool:
314+
# with RoutingConnectionPool(connector, not_reachable_address, address) as pool:
301315
# semaphore = Semaphore()
302316
#
303317
# class Refresher(Thread):
@@ -345,15 +359,15 @@ class RoutingConnectionPoolAcquireForReadTestCase(StubTestCase):
345359
def test_should_refresh(self):
346360
with StubCluster({9001: "router.script", 9004: "empty.script"}):
347361
address = ("127.0.0.1", 9001)
348-
with RoutingConnectionPool(connector, address) as pool:
362+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
349363
assert not pool.routing_table.is_fresh()
350364
_ = pool.acquire(access_mode=READ_ACCESS)
351365
assert pool.routing_table.is_fresh()
352366

353367
def test_connected_to_reader(self):
354368
with StubCluster({9001: "router.script", 9004: "empty.script"}):
355369
address = ("127.0.0.1", 9001)
356-
with RoutingConnectionPool(connector, address) as pool:
370+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
357371
assert not pool.routing_table.is_fresh()
358372
connection = pool.acquire(access_mode=READ_ACCESS)
359373
assert connection.server.address in pool.routing_table.readers
@@ -363,7 +377,7 @@ def test_should_retry_if_first_reader_fails(self):
363377
9004: "fail_on_init.script",
364378
9005: "empty.script"}):
365379
address = ("127.0.0.1", 9001)
366-
with RoutingConnectionPool(connector, address) as pool:
380+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
367381
assert not pool.routing_table.is_fresh()
368382
_ = pool.acquire(access_mode=READ_ACCESS)
369383
assert ("127.0.0.1", 9004) not in pool.routing_table.readers
@@ -375,15 +389,15 @@ class RoutingConnectionPoolAcquireForWriteTestCase(StubTestCase):
375389
def test_should_refresh(self):
376390
with StubCluster({9001: "router.script", 9006: "empty.script"}):
377391
address = ("127.0.0.1", 9001)
378-
with RoutingConnectionPool(connector, address) as pool:
392+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
379393
assert not pool.routing_table.is_fresh()
380394
_ = pool.acquire(access_mode=WRITE_ACCESS)
381395
assert pool.routing_table.is_fresh()
382396

383397
def test_connected_to_writer(self):
384398
with StubCluster({9001: "router.script", 9006: "empty.script"}):
385399
address = ("127.0.0.1", 9001)
386-
with RoutingConnectionPool(connector, address) as pool:
400+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
387401
assert not pool.routing_table.is_fresh()
388402
connection = pool.acquire(access_mode=WRITE_ACCESS)
389403
assert connection.server.address in pool.routing_table.writers
@@ -393,7 +407,7 @@ def test_should_retry_if_first_writer_fails(self):
393407
9006: "fail_on_init.script",
394408
9007: "empty.script"}):
395409
address = ("127.0.0.1", 9001)
396-
with RoutingConnectionPool(connector, address) as pool:
410+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
397411
assert not pool.routing_table.is_fresh()
398412
_ = pool.acquire(access_mode=WRITE_ACCESS)
399413
assert ("127.0.0.1", 9006) not in pool.routing_table.writers
@@ -405,7 +419,7 @@ class RoutingConnectionPoolRemoveTestCase(StubTestCase):
405419
def test_should_remove_router_from_routing_table_if_present(self):
406420
with StubCluster({9001: "router.script"}):
407421
address = ("127.0.0.1", 9001)
408-
with RoutingConnectionPool(connector, address) as pool:
422+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
409423
pool.refresh_routing_table()
410424
target = ("127.0.0.1", 9001)
411425
assert target in pool.routing_table.routers
@@ -415,7 +429,7 @@ def test_should_remove_router_from_routing_table_if_present(self):
415429
def test_should_remove_reader_from_routing_table_if_present(self):
416430
with StubCluster({9001: "router.script"}):
417431
address = ("127.0.0.1", 9001)
418-
with RoutingConnectionPool(connector, address) as pool:
432+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
419433
pool.refresh_routing_table()
420434
target = ("127.0.0.1", 9004)
421435
assert target in pool.routing_table.readers
@@ -425,7 +439,7 @@ def test_should_remove_reader_from_routing_table_if_present(self):
425439
def test_should_remove_writer_from_routing_table_if_present(self):
426440
with StubCluster({9001: "router.script"}):
427441
address = ("127.0.0.1", 9001)
428-
with RoutingConnectionPool(connector, address) as pool:
442+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
429443
pool.refresh_routing_table()
430444
target = ("127.0.0.1", 9006)
431445
assert target in pool.routing_table.writers
@@ -435,7 +449,7 @@ def test_should_remove_writer_from_routing_table_if_present(self):
435449
def test_should_not_fail_if_absent(self):
436450
with StubCluster({9001: "router.script"}):
437451
address = ("127.0.0.1", 9001)
438-
with RoutingConnectionPool(connector, address) as pool:
452+
with RoutingConnectionPool(connector, UNREACHABLE_ADDRESS, address) as pool:
439453
pool.refresh_routing_table()
440454
target = ("127.0.0.1", 9007)
441455
pool.remove(target)

test/unit/test_routing.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,5 +227,7 @@ def test_update_should_replace_ttl(self):
227227
class RoutingConnectionPoolConstructionTestCase(TestCase):
228228

229229
def test_should_populate_initial_router(self):
230-
with RoutingConnectionPool(connector, ("127.0.0.1", 9001)) as pool:
231-
assert pool.routing_table.routers == {("127.0.0.1", 9001)}
230+
initial_router = ("127.0.0.1", 9001)
231+
router = ("127.0.0.1", 9002)
232+
with RoutingConnectionPool(connector, initial_router, router) as pool:
233+
assert pool.routing_table.routers == {("127.0.0.1", 9002)}

0 commit comments

Comments
 (0)