diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index c56166cabb9093..ff7e5ba26fd3c2 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -290,6 +290,24 @@ StreamReader Return ``True`` if the buffer is empty and :meth:`feed_eof` was called. + .. method:: close() + + Invoke ``close()`` on the underlying asyncio transport (if one exists). + + Note: It is not necessary for code that is given an already + instantiated :class:`StreamReader` instance to call ``close()`` + for the sake of cleaning up resources when it is done using + it. Cleanup of the underlying transport is the + reponsibility of the code that provided the + :class:`StreamReader` instance. This method exists purely to + allow client code of APIs that hide the underlying transport to + eagerly close the transport as a way to signal to the producer + of the stream that the read side is shut down. For example, + when interacting with the standard out pipe of a sub-process. + In other words, it is not an error to not to call close() on + :class:`StreamReader` instance you've been given. + + .. versionadded:: next StreamWriter ============ diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 64aac4cc50d15a..c23c59c66ce428 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -465,6 +465,10 @@ def set_exception(self, exc): if not waiter.cancelled(): waiter.set_exception(exc) + def close(self): + if self._transport is not None: + self._transport.close() + def _wakeup_waiter(self): """Wakeup read*() functions waiting for data or EOF.""" waiter = self._waiter diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 341e3e979e002b..33dd22c372adf2 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -35,6 +35,17 @@ 'data = sys.stdin.buffer.read()', 'sys.stdout.buffer.write(data)'))] +# Program generating infinite data +PROGRAM_YES = [ + sys.executable, '-c', """\ +import sys +while True: + try: + sys.stdout.buffer.write(b"y\\n") + except BrokenPipeError: + break +"""] + def tearDownModule(): asyncio._set_event_loop_policy(None) @@ -879,6 +890,23 @@ async def main(): self.loop.run_until_complete(main()) + def test_subprocess_break_pipe(self): + # See https://github.com/python/cpython/issues/130925 + async def main(): + proc = await asyncio.create_subprocess_exec(*PROGRAM_YES, + stdout=asyncio.subprocess.PIPE) + try: + # just make sure the program has executed correctly + data = await proc.stdout.readline() + self.assertEqual(data, b"y\n") + finally: + # we are testing that the following method exists and + # has the intended effect of signaling the sub-process to terminate + proc.stdout.close() + await proc.wait() + + self.loop.run_until_complete(main()) + if sys.platform != 'win32': # Unix diff --git a/Misc/ACKS b/Misc/ACKS index d110002ffeb86c..e8d3e93bd2ea4a 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -836,6 +836,7 @@ Roberto Hueso Gomez Jim Hugunin Greg Humphreys Chris Hunt +Rian Hunter Eric Huss Nehal Hussain Taihyun Hwang diff --git a/Misc/NEWS.d/next/Library/2025-03-06-15-56-12.gh-issue-130925.USr9bm.rst b/Misc/NEWS.d/next/Library/2025-03-06-15-56-12.gh-issue-130925.USr9bm.rst new file mode 100644 index 00000000000000..bbaa272d8d42a0 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-03-06-15-56-12.gh-issue-130925.USr9bm.rst @@ -0,0 +1,3 @@ +:func:`asyncio.StreamReader.close` now exists so that it's possible to +signal to sub-processes executed via :func:`asyncio.create_subprocess_exec` +that they may cease generating output and exit cleanly.