Skip to content

PYTHON-2328 Reset the connection pool in Topology.on_change #470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions pymongo/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,20 @@ def _run(self):
# discover that we've been cancelled.
self._executor.skip_sleep()
return
self._topology.on_change(self._server_description)

# Update the Topology and clear the server pool on error.
self._topology.on_change(self._server_description,
reset_pool=self._server_description.error)

if (self._server_description.is_server_type_known and
self._server_description.topology_version):
self._start_rtt_monitor()
# Immediately check for the next streaming response.
self._executor.skip_sleep()

if self._server_description.error:
# Reset the server pool only after marking the server Unknown.
self._topology.reset_pool(self._server_description.address)
if prev_sd.is_server_type_known:
# Immediately retry on network errors.
self._executor.skip_sleep()
if self._server_description.error and prev_sd.is_server_type_known:
# Immediately retry on network errors.
self._executor.skip_sleep()
except ReferenceError:
# Topology was garbage-collected.
self.close()
Expand Down Expand Up @@ -377,6 +377,8 @@ def _run(self):
def _ping(self):
"""Run an "isMaster" command and return the RTT."""
with self._pool.get_socket({}) as sock_info:
if self._executor._stopped:
raise Exception('_RttMonitor closed')
start = _time()
sock_info.ismaster()
return _time() - start
Expand Down
22 changes: 11 additions & 11 deletions pymongo/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def select_server_by_address(self, address,
server_selection_timeout,
address)

def _process_change(self, server_description):
def _process_change(self, server_description, reset_pool=False):
"""Process a new ServerDescription on an opened topology.

Hold the lock when calling this.
Expand Down Expand Up @@ -303,10 +303,16 @@ def _process_change(self, server_description):
SRV_POLLING_TOPOLOGIES):
self._srv_monitor.close()

# Clear the pool from a failed heartbeat.
if reset_pool:
server = self._servers.get(server_description.address)
if server:
server.pool.reset()

# Wake waiters in select_servers().
self._condition.notify_all()

def on_change(self, server_description):
def on_change(self, server_description, reset_pool=False):
"""Process a new ServerDescription after an ismaster call completes."""
# We do no I/O holding the lock.
with self._lock:
Expand All @@ -320,7 +326,7 @@ def on_change(self, server_description):
# that didn't include this server.
if (self._opened and
self._description.has_server(server_description.address)):
self._process_change(server_description)
self._process_change(server_description, reset_pool)

def _process_srv_update(self, seedlist):
"""Process a new seedlist on an opened topology.
Expand Down Expand Up @@ -414,20 +420,14 @@ def request_check_all(self, wait_time=5):
self._request_check_all()
self._condition.wait(wait_time)

def reset_pool(self, address):
with self._lock:
server = self._servers.get(address)
if server:
server.pool.reset()

def handle_getlasterror(self, address, error_msg):
"""Clear our pool for a server, mark it Unknown, and check it soon."""
error = NotMasterError(error_msg, {'code': 10107, 'errmsg': error_msg})
with self._lock:
server = self._servers.get(address)
if server:
self._process_change(ServerDescription(address, error=error))
server.pool.reset()
self._process_change(
ServerDescription(address, error=error), True)
server.request_check()

def update_pool(self, all_credentials):
Expand Down