Skip to content

gh-130925: Add close() method to asyncio.StreamReader #130929

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

rianhunter
Copy link
Contributor

@rianhunter rianhunter commented Mar 6, 2025

When creating a sub-process using asyncio.create_subprocess_exec(), it returns a Process instance that has a stdout property. This property is intended to be an asyncio version of the stdout property of the Popen instance from the subprocess module.

An important aspect of Popen.stdout property is that you can close it. This is a signal to the sub-process that is generating output that it should cleanly terminate. This is a common pattern in processes used in shell pipelines. Indeed, the object located at Popen.stdout has a close() method. This pattern is demonstrated below:

import subprocess
proc = subprocess.Popen(["yes"], stdout=subprocess.PIPE) # start subprocess
data = proc.stdout.read(4096) # get data
proc.stdout.close() # signal to process to cleanly shutdown
proc.wait() # wait for shutdown

Unfortunately this pattern cannot be reproduced easily with the stdout property of the Process instance returned from asyncio.create_subprocess_exec() because stdout is an instance of StreamReader which does not have the close() method.

This change adds a close() method to the StreamReader class so that asyncio version of the subprocess module may support this pattern of managing sub-processes. This change is consistent with the asyncio ecosystem as the companion StreamWriter class already has a close() method, along with other methods that expose its inner "transport" object. It's also trivial to implement, since it's essentially a wrapper method around the inner transport object's close() method.


📚 Documentation preview 📚: https://cpython-previews--130929.org.readthedocs.build/

When creating a sub-process using `asyncio.create_subprocess_exec()`,
it returns a `Process` instance that has a `stdout` property. This
property is intended to be an asyncio version of the `stdout` property
of the `Popen` instance from the `subprocess` module.

An important aspect of `Popen.stdout` property is that you can close
it. This is a signal to the sub-process that is generating output that
it should cleanly terminate. This is a common pattern in processes
used in shell pipelines. Indeed, the object located at `Popen.stdout`
has a `close()` method. This pattern is demonstrated below:

```python
import subprocess
proc = subprocess.Popen(["yes"], stdout=subprocess.PIPE) # start subprocess
data = proc.stdout.read(4096) # get data
proc.stdout.close() # signal to process to cleanly shutdown
proc.wait() # wait for shutdown
```

Unfortunately this pattern cannot be reproduced easily with the
`stdout` property of the `Process` instance returned from
`asyncio.create_subprocess_exec()` because `stdout` is an instance of
`StreamReader` which does not have the `close()` method.

This change adds a `close()` method to the `StreamReader` class so
that asyncio version of the `subprocess` module may support this
pattern of managing sub-processes. This change is consistent with the
asyncio ecosystem as the companion `StreamWriter` class already has a
`close()` method, along with other methods that expose its inner
"transport" object. It's also trivial to implement, since it's
essentially a wrapper method around the inner transport object's
`close()` method.
@@ -290,6 +290,10 @@ StreamReader
Return ``True`` if the buffer is empty and :meth:`feed_eof`
was called.

.. method:: close()

Invoke ``close()`` on the underlying transport (if one exists).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not clear what "what" means here: "close() method" or "underlying transport"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A transport is a pervasive concept throughout the asyncio package. StreamReader objects usually have an internal transport member. I can add a link to the transport documentation page here to make that more clear. Or what do you suggest?

@@ -35,6 +35,17 @@
'data = sys.stdin.buffer.read()',
'sys.stdout.buffer.write(data)'))]

# Program generating infinite data
PROGRAM_YES = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to make this a global var? Maybe we can just define it in test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason other than consistency with PROGRAM_CAT and PROGRAM_BLOCKED. I can make it a local variable.

proc = await asyncio.create_subprocess_exec(*PROGRAM_YES,
stdout=asyncio.subprocess.PIPE)
try:
# just make sure the program has executed correctly
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# just make sure the program has executed correctly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added that comment so it's clear why those otherwise unnecessary lines of code are there. They aren't conceptually part of the unit test though. I can be more clear.

Comment on lines +903 to +904
# we are testing that the following method exists and
# has the intended effect of signaling the sub-process to terminate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# we are testing that the following method exists and
# has the intended effect of signaling the sub-process to terminate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before, the comment is there to make it explicit what the intent of the test is and the code included in the test.

Comment on lines +1 to +3
:func:`asyncio.StreamReader.close` now exists so that it's possible to
signal to sub-processes executed via :func:`asyncio.create_subprocess_exec`
that they may cease generating output and exit cleanly.
Copy link
Member

@sobolevn sobolevn Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:func:`asyncio.StreamReader.close` now exists so that it's possible to
signal to sub-processes executed via :func:`asyncio.create_subprocess_exec`
that they may cease generating output and exit cleanly.
Add :func:`asyncio.StreamReader.close` method. It can signal to sub-processes executed via :func:`asyncio.create_subprocess_exec`
that they may cease generating output and exit cleanly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the "so" was a typo? Otherwise I can change this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants