From 4e33e3f4f495a92f1cef8bf0f9a1ae6cd3b32683 Mon Sep 17 00:00:00 2001 From: Rian Hunter Date: Thu, 6 Mar 2025 15:42:23 -0500 Subject: [PATCH 1/6] Add `close()` method to `asyncio.StreamReader` When creating a sub-process using `asyncio.create_subprocess_exec()`, it returns a `Process` instance that has a `stdout` property. This property is intended to be an asyncio version of the `stdout` property of the `Popen` instance from the `subprocess` module. An important aspect of `Popen.stdout` property is that you can close it. This is a signal to the sub-process that is generating output that it should cleanly terminate. This is a common pattern in processes used in shell pipelines. Indeed, the object located at `Popen.stdout` has a `close()` method. This pattern is demonstrated below: ```python import subprocess proc = subprocess.Popen(["yes"], stdout=subprocess.PIPE) # start subprocess data = proc.stdout.read(4096) # get data proc.stdout.close() # signal to process to cleanly shutdown proc.wait() # wait for shutdown ``` Unfortunately this pattern cannot be reproduced easily with the `stdout` property of the `Process` instance returned from `asyncio.create_subprocess_exec()` because `stdout` is an instance of `StreamReader` which does not have the `close()` method. This change adds a `close()` method to the `StreamReader` class so that asyncio version of the `subprocess` module may support this pattern of managing sub-processes. This change is consistent with the asyncio ecosystem as the companion `StreamWriter` class already has a `close()` method, along with other methods that expose its inner "transport" object. It's also trivial to implement, since it's essentially a wrapper method around the inner transport object's `close()` method. --- Doc/library/asyncio-stream.rst | 4 +++ Lib/asyncio/streams.py | 4 +++ Lib/test/test_asyncio/test_subprocess.py | 29 +++++++++++++++++++ Misc/ACKS | 1 + ...-03-06-15-56-12.gh-issue-130925.USr9bm.rst | 3 ++ 5 files changed, 41 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2025-03-06-15-56-12.gh-issue-130925.USr9bm.rst diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index c56166cabb9093..3ca8335c864033 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -290,6 +290,10 @@ StreamReader Return ``True`` if the buffer is empty and :meth:`feed_eof` was called. + .. method:: close() + + Invoke ``close()`` on the underlying transport (if one exists). + StreamWriter ============ diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 64aac4cc50d15a..c23c59c66ce428 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -465,6 +465,10 @@ def set_exception(self, exc): if not waiter.cancelled(): waiter.set_exception(exc) + def close(self): + if self._transport is not None: + self._transport.close() + def _wakeup_waiter(self): """Wakeup read*() functions waiting for data or EOF.""" waiter = self._waiter diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 341e3e979e002b..72005c92e31f9a 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -35,6 +35,17 @@ 'data = sys.stdin.buffer.read()', 'sys.stdout.buffer.write(data)'))] +# Program generating infinite data +PROGRAM_YES = [ + sys.executable, '-c', """\ +import sys +while True: + try: + sys.stdout.buffer.write(b"y\\n") + except BrokenPipeError: + break +"""] + def tearDownModule(): asyncio._set_event_loop_policy(None) @@ -879,6 +890,24 @@ async def main(): self.loop.run_until_complete(main()) + def test_subprocess_break_pipe(self): + # See https://github.com/python/cpython/issues/130925 + async def main(): + proc = await asyncio.create_subprocess_exec(*PROGRAM_YES, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL) + try: + # just make sure the program has executed correctly + data = await proc.stdout.readline() + self.assertEqual(data, b"y\n") + finally: + # we are testing that the following method exists and + # has the intended effect of signaling the sub-process to terminate + proc.stdout.close() + await proc.wait() + + self.loop.run_until_complete(main()) + if sys.platform != 'win32': # Unix diff --git a/Misc/ACKS b/Misc/ACKS index d110002ffeb86c..e8d3e93bd2ea4a 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -836,6 +836,7 @@ Roberto Hueso Gomez Jim Hugunin Greg Humphreys Chris Hunt +Rian Hunter Eric Huss Nehal Hussain Taihyun Hwang diff --git a/Misc/NEWS.d/next/Library/2025-03-06-15-56-12.gh-issue-130925.USr9bm.rst b/Misc/NEWS.d/next/Library/2025-03-06-15-56-12.gh-issue-130925.USr9bm.rst new file mode 100644 index 00000000000000..bbaa272d8d42a0 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-03-06-15-56-12.gh-issue-130925.USr9bm.rst @@ -0,0 +1,3 @@ +:func:`asyncio.StreamReader.close` now exists so that it's possible to +signal to sub-processes executed via :func:`asyncio.create_subprocess_exec` +that they may cease generating output and exit cleanly. From 9bbd936efa8f63891a2a918c0c516f1f9fb0866a Mon Sep 17 00:00:00 2001 From: Rian Hunter Date: Thu, 6 Mar 2025 16:04:00 -0500 Subject: [PATCH 2/6] No need to send stderr to NULL in test, and in fact this may hide errors --- Lib/test/test_asyncio/test_subprocess.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 72005c92e31f9a..33dd22c372adf2 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -894,8 +894,7 @@ def test_subprocess_break_pipe(self): # See https://github.com/python/cpython/issues/130925 async def main(): proc = await asyncio.create_subprocess_exec(*PROGRAM_YES, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL) + stdout=asyncio.subprocess.PIPE) try: # just make sure the program has executed correctly data = await proc.stdout.readline() From 708a88a2d4466a9015f5423f5c9a5dbb84d42966 Mon Sep 17 00:00:00 2001 From: Rian Hunter Date: Fri, 7 Mar 2025 13:15:32 -0500 Subject: [PATCH 3/6] Clarify close() documentation --- Doc/library/asyncio-stream.rst | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index 3ca8335c864033..f1a673993c78d9 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -292,7 +292,20 @@ StreamReader .. method:: close() - Invoke ``close()`` on the underlying transport (if one exists). + Invoke ``close()`` on the underlying asyncio transport (if one exists). + + Note: It is not necessary for code that is given an already + instantiated :class:`StreamReader` instance to call `close()` + for the sake of cleaning up resources when it is done using + it. Cleanup of the underlying transport is the + reponsibility of the code that provided the + :class:`StreamReader` instance. This method exists purely to + allow client code of APIs that hide the underlying transport to + eagerly close the transport as a way to signal to the producer + of the stream that the read side is shut down. In particular, + when interacting with the standard out pipe of a sub-process. + In other words, it is an error to not to call close() on + :class:`StreamReader` instance you've been given. StreamWriter From 92fb461befa737f3a9a14b01060827e5656cd816 Mon Sep 17 00:00:00 2001 From: Rian Hunter Date: Fri, 7 Mar 2025 13:23:31 -0500 Subject: [PATCH 4/6] Fix backticks --- Doc/library/asyncio-stream.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index f1a673993c78d9..38dedc178a70e1 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -295,7 +295,7 @@ StreamReader Invoke ``close()`` on the underlying asyncio transport (if one exists). Note: It is not necessary for code that is given an already - instantiated :class:`StreamReader` instance to call `close()` + instantiated :class:`StreamReader` instance to call ``close()`` for the sake of cleaning up resources when it is done using it. Cleanup of the underlying transport is the reponsibility of the code that provided the From c673bdcfafda72849c68e19478e1344d018b799a Mon Sep 17 00:00:00 2001 From: Rian Hunter Date: Fri, 7 Mar 2025 14:35:21 -0500 Subject: [PATCH 5/6] Fix wording --- Doc/library/asyncio-stream.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index 38dedc178a70e1..1f0613c0db2d16 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -302,9 +302,9 @@ StreamReader :class:`StreamReader` instance. This method exists purely to allow client code of APIs that hide the underlying transport to eagerly close the transport as a way to signal to the producer - of the stream that the read side is shut down. In particular, + of the stream that the read side is shut down. For example, when interacting with the standard out pipe of a sub-process. - In other words, it is an error to not to call close() on + In other words, it is not an error to not to call close() on :class:`StreamReader` instance you've been given. From 36fe4f45810905f9675d4372bffed95551c337c1 Mon Sep 17 00:00:00 2001 From: Rian Hunter Date: Fri, 7 Mar 2025 14:44:49 -0500 Subject: [PATCH 6/6] Add versionadded annotation to documentation --- Doc/library/asyncio-stream.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index 1f0613c0db2d16..ff7e5ba26fd3c2 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -307,6 +307,7 @@ StreamReader In other words, it is not an error to not to call close() on :class:`StreamReader` instance you've been given. + .. versionadded:: next StreamWriter ============