-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-5053 - AsyncMongoClient.close() should await all background tasks #2127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
68c4a6e
d14d8e8
6c6a32d
a0a85c5
861dbb5
2ac4ea3
24e96f0
a2eb4bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -189,6 +189,10 @@ def gc_safe_close(self) -> None: | |
self._rtt_monitor.gc_safe_close() | ||
self.cancel_check() | ||
|
||
async def join(self, timeout: Optional[int] = None) -> None: | ||
await self._executor.join(timeout) | ||
await self._rtt_monitor.join() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should use gather too right? |
||
|
||
async def close(self) -> None: | ||
self.gc_safe_close() | ||
await self._rtt_monitor.close() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -520,6 +520,8 @@ async def _process_change( | |
and self._description.topology_type not in SRV_POLLING_TOPOLOGIES | ||
): | ||
await self._srv_monitor.close() | ||
if not _IS_SYNC: | ||
await self._srv_monitor.join() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should have a separate code path for join() all the way down. That way we first signal everything to shutdown, then we wait for everything to exit. Joining each task individually will slowdown the close(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant to put this comment on Topology.close(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can deadlock. It's not safe to call join() while holding the topology lock because the task we're attempting to join() may be blocking on acquiring the same lock. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're suggesting that we make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No I'm suggesting two changes.
tasks = []
with self.lock:
for s in servers:
await s.close()
tasks.append(s)
...
# Only join after releasing the lock
await asyncio.gather(t.join() for t in tasks) The current approach is slow because of 1) and risky because of 2). The slowness is because joining each task inline after close() essentially serializes the shutdown process. So if you have 50 servers and it takes 10ms to join the task it will take 500ms altogether. With my suggestion it will only take 10ms total since all the tasks can exit concurrently. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, something like this inside
And then we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. task.cancel() might prevent the deadlock scenario in the async path. |
||
|
||
# Clear the pool from a failed heartbeat. | ||
if reset_pool: | ||
|
@@ -705,6 +707,8 @@ async def close(self) -> None: | |
# Stop SRV polling thread. | ||
if self._srv_monitor: | ||
await self._srv_monitor.close() | ||
if not _IS_SYNC: | ||
await self._srv_monitor.join() | ||
|
||
self._opened = False | ||
self._closed = True | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we ever pass a timeout here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not currently, but we likely should once we're productionizing the async API for release. Currently there's too much variability between platforms and test suites to pick a good timeout number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove the timeout parameter since it's not used.