Skip to content

GH-109564: add asyncio.Server state machine #131009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import collections
import collections.abc
import concurrent.futures
import enum
import errno
import heapq
import itertools
Expand Down Expand Up @@ -272,6 +273,23 @@ async def restore(self):
self._proto.resume_writing()


class _ServerState(enum.Enum):
"""This tracks the state of Server.

-[in]->INITIALIZED -[ss]-> SERVING -[cl]-> CLOSED -[wk]*-> SHUTDOWN

- in: Server.__init__()
- ss: Server._start_serving()
- cl: Server.close()
- wk: Server._wakeup() *only called if number of clients == 0
"""

INITIALIZED = "initialized"
SERVING = "serving"
CLOSED = "closed"
SHUTDOWN = "shutdown"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the name SHUTDOWN here, but needed something more "definitive" than closed.

If we do use SHUTDOWN should I also rename Server._wakeup -> Server._shutdown?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do use SHUTDOWN should I also rename Server._wakeup -> Server._shutdown?

I think that makes sense.

I'm more worried about INITIALIZED here; nothing is really initialized, other than the Python object, which isn't particularly useful knowledge. Let's call it something like NOT_SERVING or NOT_YET_STARTED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 07129e5



class Server(events.AbstractServer):

def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
Expand All @@ -287,32 +305,47 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
self._ssl_context = ssl_context
self._ssl_handshake_timeout = ssl_handshake_timeout
self._ssl_shutdown_timeout = ssl_shutdown_timeout
self._serving = False
self._state = _ServerState.INITIALIZED
self._serving_forever_fut = None

def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'

def _attach(self, transport):
assert self._sockets is not None
if self._state != _ServerState.SERVING:
raise RuntimeError("server is not serving, cannot attach transport")
self._clients.add(transport)

def _detach(self, transport):
self._clients.discard(transport)
if len(self._clients) == 0 and self._sockets is None:
if self._state == _ServerState.CLOSED and len(self._clients) == 0:
self._wakeup()

def _wakeup(self):
if self._state == _ServerState.CLOSED:
self._state = _ServerState.SHUTDOWN
elif self._state == _ServerState.SHUTDOWN:
# gh109564: the wakeup method has two possible call-sites,
# through an explicit call Server.close(), or indirectly through
# Server._detach() by the last connected client.
return
else:
raise RuntimeError(f"server {self!r} can only wakeup waiters after closing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message isn't making much sense to me. We'll never reach here "after closing," right? This can only be triggered when the state is SERVING or INITIALIZED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this is bad wording, I was trying to say that the Server must be closed before it can be shutdown (i.e. we cannot shutdown from the SERVING state, nor the INITIALIZED/NOT_STARTED state)

Made a fix in: 7f3481b


waiters = self._waiters
self._waiters = None
for waiter in waiters:
if not waiter.done():
waiter.set_result(None)

def _start_serving(self):
if self._serving:
if self._state == _ServerState.INITIALIZED:
self._state = _ServerState.SERVING
elif self._state == _ServerState.SERVING:
return
self._serving = True
else:
raise RuntimeError(f'server {self!r} is closed')

for sock in self._sockets:
sock.listen(self._backlog)
self._loop._start_serving(
Expand All @@ -324,7 +357,7 @@ def get_loop(self):
return self._loop

def is_serving(self):
return self._serving
return self._state == _ServerState.SERVING

@property
def sockets(self):
Expand All @@ -333,6 +366,11 @@ def sockets(self):
return tuple(trsock.TransportSocket(s) for s in self._sockets)

def close(self):
if self._state == _ServerState.CLOSED or self._state == _ServerState.SHUTDOWN:
return
else:
self._state = _ServerState.CLOSED
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if something magically goes wrong after this has been set? Is the server still alive while thinking it's closed? It might be worth adding a try/except to undo this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah great point! Although I'm not sure how that would look from a user's point of view. Should we give them a warning to let them know they must try to close the server again? Alternatively, should we expose something like a force kwarg/flag?

I pushed an initial fix with a simple try/except to recover the previous state on fail in 48a3c0d


sockets = self._sockets
if sockets is None:
return
Expand All @@ -341,8 +379,6 @@ def close(self):
for sock in sockets:
self._loop._stop_serving(sock)

self._serving = False

if (self._serving_forever_fut is not None and
not self._serving_forever_fut.done()):
self._serving_forever_fut.cancel()
Expand All @@ -369,8 +405,6 @@ async def serve_forever(self):
if self._serving_forever_fut is not None:
raise RuntimeError(
f'server {self!r} is already being awaited on serve_forever()')
if self._sockets is None:
raise RuntimeError(f'server {self!r} is closed')

self._start_serving()
self._serving_forever_fut = self._loop.create_future()
Expand Down Expand Up @@ -407,7 +441,7 @@ async def wait_closed(self):
# from two places: self.close() and self._detach(), but only
# when both conditions have become true. To signal that this
# has happened, self._wakeup() sets self._waiters to None.
if self._waiters is None:
if self._state == _ServerState.SHUTDOWN:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
Expand Down
6 changes: 5 additions & 1 deletion Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._called_connection_lost = False
self._eof_written = False
if self._server is not None:
self._server._attach(self)
if self._server.is_serving():
self._server._attach(self)
else:
self.abort()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why abort the whole transport? This is seemingly a change in behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yup sorry, my misunderstanding! Addressed in 5832655

I commented here my original thinking:

This is a functional change over main.. In my head, if a transport was constructed with server != None, then the transport should not be created/serviced if its corresponding server was shutdown.

But I re-read the docs for Server.close and see I was thinking about it incorrectly:

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.Server.close

The sockets that represent existing incoming client connections are left open.

return
self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
Expand Down
7 changes: 6 additions & 1 deletion Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,12 @@ def __init__(self, loop, sock, protocol, extra=None, server=None):
self._paused = False # Set when pause_reading() called

if self._server is not None:
self._server._attach(self)
if self._server.is_serving():
self._server._attach(self)
else:
self.abort()
return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a functional change over main.. In my head, if a transport was constructed with server != None, then the transport should not be created/serviced if its corresponding server was shutdown.

Used transport.abort over transport.close to force clear the buffer, since no server exists to service that.

A return statement was added to the else block to prevent the transport getting tracked in loop._transports


loop._transports[self._sock_fd] = self

def __repr__(self):
Expand Down
29 changes: 29 additions & 0 deletions Lib/test/test_asyncio/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import threading
import unittest
from unittest.mock import Mock

from test.support import socket_helper
from test.test_asyncio import utils as test_utils
Expand Down Expand Up @@ -186,6 +187,8 @@ async def serve(rd, wr):
loop.call_soon(srv.close)
loop.call_soon(wr.close)
await srv.wait_closed()
self.assertTrue(task.done())
self.assertFalse(srv.is_serving())

async def test_close_clients(self):
async def serve(rd, wr):
Expand All @@ -212,6 +215,9 @@ async def serve(rd, wr):
await asyncio.sleep(0)
self.assertTrue(task.done())

with self.assertRaisesRegex(RuntimeError, r'is closed'):
await srv.start_serving()

async def test_abort_clients(self):
async def serve(rd, wr):
fut.set_result((rd, wr))
Expand Down Expand Up @@ -266,6 +272,29 @@ async def serve(rd, wr):
await asyncio.sleep(0)
self.assertTrue(task.done())

async def test_close_before_transport_attach(self):
proto = Mock()
loop = asyncio.get_running_loop()
srv = await loop.create_server(lambda *_: proto, socket_helper.HOSTv4, 0)

await srv.start_serving()
addr = srv.sockets[0].getsockname()

# Create a connection to the server but close the server before the
# socket transport for the connection is created and attached
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(addr)
await asyncio.sleep(0) # loop select reader
await asyncio.sleep(0) # accept conn 1
srv.close()

# Ensure the protocol is given an opportunity to handle this event
# gh109564: the transport would be unclosed and will cause a loop
# exception due to a double-call to Server._wakeup
await asyncio.sleep(0)
await asyncio.sleep(0)
proto.connection_lost.assert_called()


# Test the various corner cases of Unix server socket removal
class UnixServerCleanupTests(unittest.IsolatedAsyncioTestCase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix race condition in :meth:`asyncio.Server.close`. Patch by Jamie Phan.
Loading