Skip to content

PYTHON-5053 - AsyncMongoClient.close() should await all background tasks #2127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,8 @@ async def close(self) -> None:
# Stop the periodic task thread and then send pending killCursor
# requests before closing the topology.
self._kill_cursors_executor.close()
if not _IS_SYNC:
await self._kill_cursors_executor.join()
await self._process_kill_cursors()
await self._topology.close()
if self._encrypter:
Expand Down
4 changes: 4 additions & 0 deletions pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ def gc_safe_close(self) -> None:
self._rtt_monitor.gc_safe_close()
self.cancel_check()

async def join(self, timeout: Optional[int] = None) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever pass a timeout here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not currently, but we likely should once we're productionizing the async API for release. Currently there's too much variability between platforms and test suites to pick a good timeout number.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the timeout parameter since it's not used.

await self._executor.join(timeout)
await self._rtt_monitor.join()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use gather too right?


async def close(self) -> None:
self.gc_safe_close()
await self._rtt_monitor.close()
Expand Down
2 changes: 2 additions & 0 deletions pymongo/asynchronous/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ async def close(self) -> None:
)

await self._monitor.close()
if not _IS_SYNC:
await self._monitor.join()
await self._pool.close()

def request_check(self) -> None:
Expand Down
4 changes: 4 additions & 0 deletions pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ async def _process_change(
and self._description.topology_type not in SRV_POLLING_TOPOLOGIES
):
await self._srv_monitor.close()
if not _IS_SYNC:
await self._srv_monitor.join()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a separate code path for join() all the way down. That way we first signal everything to shutdown, then we wait for everything to exit. Joining each task individually will slowdown the close().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to put this comment on Topology.close().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can deadlock. It's not safe to call join() while holding the topology lock because the task we're attempting to join() may be blocking on acquiring the same lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're suggesting that we make executor.close() not call task.cancel() and we do it inside the join() instead?

Copy link
Member

@ShaneHarvey ShaneHarvey Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I'm suggesting two changes.

  1. We call close like we do currently. Then after everything is closed, we call join on everything.
  2. we never call join() while holding a lock.
tasks = []
with self.lock:
    for s in servers:
        await s.close()
        tasks.append(s)
    ...
# Only join after releasing the lock
await asyncio.gather(t.join() for t in tasks)

The current approach is slow because of 1) and risky because of 2). The slowness is because joining each task inline after close() essentially serializes the shutdown process. So if you have 50 servers and it takes 10ms to join the task it will take 500ms altogether. With my suggestion it will only take 10ms total since all the tasks can exit concurrently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, something like this inside Topology?

async def join(self):
    # Join all monitors
    ...

And then we call topology.join() inside AsyncMongoClient.close()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task.cancel() might prevent the deadlock scenario in the async path.


# Clear the pool from a failed heartbeat.
if reset_pool:
Expand Down Expand Up @@ -705,6 +707,8 @@ async def close(self) -> None:
# Stop SRV polling thread.
if self._srv_monitor:
await self._srv_monitor.close()
if not _IS_SYNC:
await self._srv_monitor.join()

self._opened = False
self._closed = True
Expand Down
2 changes: 2 additions & 0 deletions pymongo/periodic_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def close(self, dummy: Any = None) -> None:
callback; see monitor.py.
"""
self._stopped = True
if self._task is not None:
self._task.cancel()

async def join(self, timeout: Optional[int] = None) -> None:
if self._task is not None:
Expand Down
2 changes: 2 additions & 0 deletions pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1553,6 +1553,8 @@ def close(self) -> None:
# Stop the periodic task thread and then send pending killCursor
# requests before closing the topology.
self._kill_cursors_executor.close()
if not _IS_SYNC:
self._kill_cursors_executor.join()
self._process_kill_cursors()
self._topology.close()
if self._encrypter:
Expand Down
4 changes: 4 additions & 0 deletions pymongo/synchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ def gc_safe_close(self) -> None:
self._rtt_monitor.gc_safe_close()
self.cancel_check()

def join(self, timeout: Optional[int] = None) -> None:
self._executor.join(timeout)
self._rtt_monitor.join()

def close(self) -> None:
self.gc_safe_close()
self._rtt_monitor.close()
Expand Down
2 changes: 2 additions & 0 deletions pymongo/synchronous/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ def close(self) -> None:
)

self._monitor.close()
if not _IS_SYNC:
self._monitor.join()
self._pool.close()

def request_check(self) -> None:
Expand Down
4 changes: 4 additions & 0 deletions pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ def _process_change(
and self._description.topology_type not in SRV_POLLING_TOPOLOGIES
):
self._srv_monitor.close()
if not _IS_SYNC:
self._srv_monitor.join()

# Clear the pool from a failed heartbeat.
if reset_pool:
Expand Down Expand Up @@ -703,6 +705,8 @@ def close(self) -> None:
# Stop SRV polling thread.
if self._srv_monitor:
self._srv_monitor.close()
if not _IS_SYNC:
self._srv_monitor.join()

self._opened = False
self._closed = True
Expand Down
Loading