-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
GH-109564: add asyncio.Server state machine #131009
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
0fec860
f3b96bf
8e409b7
a92158a
44d24fb
07129e5
48a3c0d
5832655
7f3481b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
import collections | ||
import collections.abc | ||
import concurrent.futures | ||
import enum | ||
import errno | ||
import heapq | ||
import itertools | ||
|
@@ -272,6 +273,23 @@ async def restore(self): | |
self._proto.resume_writing() | ||
|
||
|
||
class _ServerState(enum.Enum): | ||
"""This tracks the state of Server. | ||
|
||
-[in]->INITIALIZED -[ss]-> SERVING -[cl]-> CLOSED -[wk]*-> SHUTDOWN | ||
|
||
- in: Server.__init__() | ||
- ss: Server._start_serving() | ||
- cl: Server.close() | ||
- wk: Server._wakeup() *only called if number of clients == 0 | ||
""" | ||
|
||
INITIALIZED = "initialized" | ||
SERVING = "serving" | ||
CLOSED = "closed" | ||
SHUTDOWN = "shutdown" | ||
|
||
|
||
class Server(events.AbstractServer): | ||
|
||
def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, | ||
|
@@ -287,32 +305,47 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, | |
self._ssl_context = ssl_context | ||
self._ssl_handshake_timeout = ssl_handshake_timeout | ||
self._ssl_shutdown_timeout = ssl_shutdown_timeout | ||
self._serving = False | ||
self._state = _ServerState.INITIALIZED | ||
self._serving_forever_fut = None | ||
|
||
def __repr__(self): | ||
return f'<{self.__class__.__name__} sockets={self.sockets!r}>' | ||
|
||
def _attach(self, transport): | ||
assert self._sockets is not None | ||
if self._state != _ServerState.SERVING: | ||
raise RuntimeError("server is not serving, cannot attach transport") | ||
self._clients.add(transport) | ||
|
||
def _detach(self, transport): | ||
self._clients.discard(transport) | ||
if len(self._clients) == 0 and self._sockets is None: | ||
if self._state == _ServerState.CLOSED and len(self._clients) == 0: | ||
self._wakeup() | ||
|
||
def _wakeup(self): | ||
if self._state == _ServerState.CLOSED: | ||
self._state = _ServerState.SHUTDOWN | ||
elif self._state == _ServerState.SHUTDOWN: | ||
# gh109564: the wakeup method has two possible call-sites, | ||
# through an explicit call Server.close(), or indirectly through | ||
# Server._detach() by the last connected client. | ||
return | ||
else: | ||
raise RuntimeError(f"server {self!r} can only wakeup waiters after closing") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This error message isn't making much sense to me. We'll never reach here "after closing," right? This can only be triggered when the state is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry this is bad wording, I was trying to say that the Server must be closed before it can be shutdown (i.e. we cannot shutdown from the Made a fix in: 7f3481b |
||
|
||
waiters = self._waiters | ||
self._waiters = None | ||
for waiter in waiters: | ||
if not waiter.done(): | ||
waiter.set_result(None) | ||
|
||
def _start_serving(self): | ||
if self._serving: | ||
if self._state == _ServerState.INITIALIZED: | ||
self._state = _ServerState.SERVING | ||
elif self._state == _ServerState.SERVING: | ||
return | ||
self._serving = True | ||
else: | ||
raise RuntimeError(f'server {self!r} is closed') | ||
ordinary-jamie marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
for sock in self._sockets: | ||
sock.listen(self._backlog) | ||
self._loop._start_serving( | ||
|
@@ -324,7 +357,7 @@ def get_loop(self): | |
return self._loop | ||
|
||
def is_serving(self): | ||
return self._serving | ||
return self._state == _ServerState.SERVING | ||
|
||
@property | ||
def sockets(self): | ||
|
@@ -333,6 +366,11 @@ def sockets(self): | |
return tuple(trsock.TransportSocket(s) for s in self._sockets) | ||
|
||
def close(self): | ||
if self._state == _ServerState.CLOSED or self._state == _ServerState.SHUTDOWN: | ||
ordinary-jamie marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
else: | ||
self._state = _ServerState.CLOSED | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if something magically goes wrong after this has been set? Is the server still alive while thinking it's closed? It might be worth adding a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah great point! Although I'm not sure how that would look from a user's point of view. Should we give them a warning to let them know they must try to close the server again? Alternatively, should we expose something like a I pushed an initial fix with a simple |
||
|
||
sockets = self._sockets | ||
if sockets is None: | ||
return | ||
|
@@ -341,8 +379,6 @@ def close(self): | |
for sock in sockets: | ||
self._loop._stop_serving(sock) | ||
|
||
self._serving = False | ||
|
||
if (self._serving_forever_fut is not None and | ||
not self._serving_forever_fut.done()): | ||
self._serving_forever_fut.cancel() | ||
|
@@ -369,8 +405,6 @@ async def serve_forever(self): | |
if self._serving_forever_fut is not None: | ||
raise RuntimeError( | ||
f'server {self!r} is already being awaited on serve_forever()') | ||
if self._sockets is None: | ||
raise RuntimeError(f'server {self!r} is closed') | ||
|
||
self._start_serving() | ||
self._serving_forever_fut = self._loop.create_future() | ||
|
@@ -407,7 +441,7 @@ async def wait_closed(self): | |
# from two places: self.close() and self._detach(), but only | ||
# when both conditions have become true. To signal that this | ||
# has happened, self._wakeup() sets self._waiters to None. | ||
if self._waiters is None: | ||
if self._state == _ServerState.SHUTDOWN: | ||
return | ||
waiter = self._loop.create_future() | ||
self._waiters.append(waiter) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,7 +63,11 @@ def __init__(self, loop, sock, protocol, waiter=None, | |
self._called_connection_lost = False | ||
self._eof_written = False | ||
if self._server is not None: | ||
self._server._attach(self) | ||
if self._server.is_serving(): | ||
self._server._attach(self) | ||
else: | ||
self.abort() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why abort the whole transport? This is seemingly a change in behavior. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yup sorry, my misunderstanding! Addressed in 5832655 I commented here my original thinking:
But I re-read the docs for https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.Server.close
|
||
return | ||
self._loop.call_soon(self._protocol.connection_made, self) | ||
if waiter is not None: | ||
# only wake up the waiter when connection_made() has been called | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -795,7 +795,12 @@ def __init__(self, loop, sock, protocol, extra=None, server=None): | |
self._paused = False # Set when pause_reading() called | ||
|
||
if self._server is not None: | ||
self._server._attach(self) | ||
if self._server.is_serving(): | ||
self._server._attach(self) | ||
else: | ||
self.abort() | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a functional change over Used A return statement was added to the |
||
|
||
loop._transports[self._sock_fd] = self | ||
|
||
def __repr__(self): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix race condition in :meth:`asyncio.Server.close`. Patch by Jamie Phan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about the name
SHUTDOWN
here, but needed something more "definitive" thanclosed
.If we do use
SHUTDOWN
should I also renameServer._wakeup
->Server._shutdown
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes sense.
I'm more worried about
INITIALIZED
here; nothing is really initialized, other than the Python object, which isn't particularly useful knowledge. Let's call it something likeNOT_SERVING
orNOT_YET_STARTED
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 07129e5