Skip to content

Commit fe133aa

Browse files
authored
bpo-32391: Implement StreamWriter.wait_closed() (#5281)
1 parent 04af5b1 commit fe133aa

File tree

4 files changed

+73
-10
lines changed

4 files changed

+73
-10
lines changed

Doc/library/asyncio-stream.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,21 @@ StreamWriter
201201

202202
Close the transport: see :meth:`BaseTransport.close`.
203203

204+
.. method:: is_closing()
205+
206+
Return ``True`` if the writer is closing or is closed.
207+
208+
.. versionadded:: 3.7
209+
210+
.. coroutinemethod:: wait_closed()
211+
212+
Wait until the writer is closed.
213+
214+
Should be called after :meth:`close` to wait until the underlying
215+
connection (and the associated transport/protocol pair) is closed.
216+
217+
.. versionadded:: 3.7
218+
204219
.. coroutinemethod:: drain()
205220

206221
Let the write buffer of the underlying transport a chance to be flushed.

Lib/asyncio/streams.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ def __init__(self, stream_reader, client_connected_cb=None, loop=None):
224224
self._stream_writer = None
225225
self._client_connected_cb = client_connected_cb
226226
self._over_ssl = False
227+
self._closed = self._loop.create_future()
227228

228229
def connection_made(self, transport):
229230
self._stream_reader.set_transport(transport)
@@ -243,6 +244,11 @@ def connection_lost(self, exc):
243244
self._stream_reader.feed_eof()
244245
else:
245246
self._stream_reader.set_exception(exc)
247+
if not self._closed.done():
248+
if exc is None:
249+
self._closed.set_result(None)
250+
else:
251+
self._closed.set_exception(exc)
246252
super().connection_lost(exc)
247253
self._stream_reader = None
248254
self._stream_writer = None
@@ -259,6 +265,13 @@ def eof_received(self):
259265
return False
260266
return True
261267

268+
def __del__(self):
269+
# Prevent reports about unhandled exceptions.
270+
# Better than self._closed._log_traceback = False hack
271+
closed = self._closed
272+
if closed.done() and not closed.cancelled():
273+
closed.exception()
274+
262275

263276
class StreamWriter:
264277
"""Wraps a Transport.
@@ -303,6 +316,12 @@ def can_write_eof(self):
303316
def close(self):
304317
return self._transport.close()
305318

319+
def is_closing(self):
320+
return self._transport.is_closing()
321+
322+
async def wait_closed(self):
323+
await self._protocol._closed
324+
306325
def get_extra_info(self, name, default=None):
307326
return self._transport.get_extra_info(name, default)
308327

@@ -318,15 +337,14 @@ async def drain(self):
318337
exc = self._reader.exception()
319338
if exc is not None:
320339
raise exc
321-
if self._transport is not None:
322-
if self._transport.is_closing():
323-
# Yield to the event loop so connection_lost() may be
324-
# called. Without this, _drain_helper() would return
325-
# immediately, and code that calls
326-
# write(...); await drain()
327-
# in a loop would never call connection_lost(), so it
328-
# would not see an error when the socket is closed.
329-
await sleep(0, loop=self._loop)
340+
if self._transport.is_closing():
341+
# Yield to the event loop so connection_lost() may be
342+
# called. Without this, _drain_helper() would return
343+
# immediately, and code that calls
344+
# write(...); await drain()
345+
# in a loop would never call connection_lost(), so it
346+
# would not see an error when the socket is closed.
347+
await sleep(0, loop=self._loop)
330348
await self._protocol._drain_helper()
331349

332350

Lib/test/test_asyncio/test_streams.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from test.test_asyncio import utils as test_utils
2020

2121

22-
class StreamReaderTests(test_utils.TestCase):
22+
class StreamTests(test_utils.TestCase):
2323

2424
DATA = b'line1\nline2\nline3\n'
2525

@@ -860,6 +860,35 @@ def test_LimitOverrunError_pickleable(self):
860860
self.assertEqual(str(e), str(e2))
861861
self.assertEqual(e.consumed, e2.consumed)
862862

863+
def test_wait_closed_on_close(self):
864+
with test_utils.run_test_server() as httpd:
865+
rd, wr = self.loop.run_until_complete(
866+
asyncio.open_connection(*httpd.address, loop=self.loop))
867+
868+
wr.write(b'GET / HTTP/1.0\r\n\r\n')
869+
f = rd.readline()
870+
data = self.loop.run_until_complete(f)
871+
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
872+
f = rd.read()
873+
data = self.loop.run_until_complete(f)
874+
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
875+
self.assertFalse(wr.is_closing())
876+
wr.close()
877+
self.assertTrue(wr.is_closing())
878+
self.loop.run_until_complete(wr.wait_closed())
879+
880+
def test_wait_closed_on_close_with_unread_data(self):
881+
with test_utils.run_test_server() as httpd:
882+
rd, wr = self.loop.run_until_complete(
883+
asyncio.open_connection(*httpd.address, loop=self.loop))
884+
885+
wr.write(b'GET / HTTP/1.0\r\n\r\n')
886+
f = rd.readline()
887+
data = self.loop.run_until_complete(f)
888+
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
889+
wr.close()
890+
self.loop.run_until_complete(wr.wait_closed())
891+
863892

864893
if __name__ == '__main__':
865894
unittest.main()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implement :meth:`asyncio.StreamWriter.wait_closed` and :meth:`asyncio.StreamWriter.is_closing` methods

0 commit comments

Comments
 (0)