Skip to content

Commit 402b6a3

Browse files
Added PlainTextOutput: an output that doesn't write ANSI escape sequences to the file.
1 parent 57b42c4 commit 402b6a3

File tree

3 files changed

+224
-76
lines changed

3 files changed

+224
-76
lines changed

prompt_toolkit/output/flush_stdout.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import errno
2+
import os
3+
import sys
4+
from contextlib import contextmanager
5+
from typing import IO, Iterator, TextIO, cast
6+
7+
__all__ = ["flush_stdout"]
8+
9+
10+
def flush_stdout(stdout: TextIO, data: str, write_binary: bool) -> None:
11+
try:
12+
# Ensure that `stdout` is made blocking when writing into it.
13+
# Otherwise, when uvloop is activated (which makes stdout
14+
# non-blocking), and we write big amounts of text, then we get a
15+
# `BlockingIOError` here.
16+
with _blocking_io(stdout):
17+
# (We try to encode ourself, because that way we can replace
18+
# characters that don't exist in the character set, avoiding
19+
# UnicodeEncodeError crashes. E.g. u'\xb7' does not appear in 'ascii'.)
20+
# My Arch Linux installation of july 2015 reported 'ANSI_X3.4-1968'
21+
# for sys.stdout.encoding in xterm.
22+
out: IO[bytes]
23+
if write_binary:
24+
if hasattr(stdout, "buffer"):
25+
out = stdout.buffer
26+
else:
27+
# IO[bytes] was given to begin with.
28+
# (Used in the unit tests, for instance.)
29+
out = cast(IO[bytes], stdout)
30+
out.write(data.encode(stdout.encoding or "utf-8", "replace"))
31+
else:
32+
stdout.write(data)
33+
34+
stdout.flush()
35+
except IOError as e:
36+
if e.args and e.args[0] == errno.EINTR:
37+
# Interrupted system call. Can happen in case of a window
38+
# resize signal. (Just ignore. The resize handler will render
39+
# again anyway.)
40+
pass
41+
elif e.args and e.args[0] == 0:
42+
# This can happen when there is a lot of output and the user
43+
# sends a KeyboardInterrupt by pressing Control-C. E.g. in
44+
# a Python REPL when we execute "while True: print('test')".
45+
# (The `ptpython` REPL uses this `Output` class instead of
46+
# `stdout` directly -- in order to be network transparent.)
47+
# So, just ignore.
48+
pass
49+
else:
50+
raise
51+
52+
53+
@contextmanager
54+
def _blocking_io(io: IO[str]) -> Iterator[None]:
55+
"""
56+
Ensure that the FD for `io` is set to blocking in here.
57+
"""
58+
if sys.platform == "win32":
59+
# On Windows, the `os` module doesn't have a `get/set_blocking`
60+
# function.
61+
yield
62+
return
63+
64+
try:
65+
fd = io.fileno()
66+
blocking = os.get_blocking(fd)
67+
except: # noqa
68+
# Failed somewhere.
69+
# `get_blocking` can raise `OSError`.
70+
# The io object can raise `AttributeError` when no `fileno()` method is
71+
# present if we're not a real file object.
72+
blocking = True # Assume we're good, and don't do anything.
73+
74+
try:
75+
# Make blocking if we weren't blocking yet.
76+
if not blocking:
77+
os.set_blocking(fd, True)
78+
79+
yield
80+
81+
finally:
82+
# Restore original blocking mode.
83+
if not blocking:
84+
os.set_blocking(fd, blocking)

prompt_toolkit/output/plain_text.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
from typing import List, TextIO
2+
3+
from prompt_toolkit.data_structures import Size
4+
from prompt_toolkit.styles import Attrs
5+
6+
from .base import Output
7+
from .color_depth import ColorDepth
8+
from .flush_stdout import flush_stdout
9+
10+
__all__ = ["PlainTextOutput"]
11+
12+
13+
class PlainTextOutput(Output):
14+
"""
15+
Output that won't include any ANSI escape sequences.
16+
17+
Useful when stdout is not a terminal. Maybe stdout is redirected to a file.
18+
In this case, if `print_formatted_text` is used, for instance, we don't
19+
want to include formatting.
20+
21+
(The code is mostly identical to `Vt100_Output`, but without the
22+
formatting.)
23+
"""
24+
25+
def __init__(self, stdout: TextIO, write_binary: bool = True) -> None:
26+
assert all(hasattr(stdout, a) for a in ("write", "flush"))
27+
28+
if write_binary:
29+
assert hasattr(stdout, "encoding")
30+
31+
self.stdout: TextIO = stdout
32+
self.write_binary = write_binary
33+
self._buffer: List[str] = []
34+
35+
def fileno(self) -> int:
36+
"There is no sensible default for fileno()."
37+
return self.stdout.fileno()
38+
39+
def encoding(self) -> str:
40+
return "utf-8"
41+
42+
def write(self, data: str) -> None:
43+
self._buffer.append(data)
44+
45+
def write_raw(self, data: str) -> None:
46+
self._buffer.append(data)
47+
48+
def set_title(self, title: str) -> None:
49+
pass
50+
51+
def clear_title(self) -> None:
52+
pass
53+
54+
def flush(self) -> None:
55+
if not self._buffer:
56+
return
57+
58+
data = "".join(self._buffer)
59+
self._buffer = []
60+
flush_stdout(self.stdout, data, write_binary=self.write_binary)
61+
62+
def erase_screen(self) -> None:
63+
pass
64+
65+
def enter_alternate_screen(self) -> None:
66+
pass
67+
68+
def quit_alternate_screen(self) -> None:
69+
pass
70+
71+
def enable_mouse_support(self) -> None:
72+
pass
73+
74+
def disable_mouse_support(self) -> None:
75+
pass
76+
77+
def erase_end_of_line(self) -> None:
78+
pass
79+
80+
def erase_down(self) -> None:
81+
pass
82+
83+
def reset_attributes(self) -> None:
84+
pass
85+
86+
def set_attributes(self, attrs: Attrs, color_depth: ColorDepth) -> None:
87+
pass
88+
89+
def disable_autowrap(self) -> None:
90+
pass
91+
92+
def enable_autowrap(self) -> None:
93+
pass
94+
95+
def cursor_goto(self, row: int = 0, column: int = 0) -> None:
96+
pass
97+
98+
def cursor_up(self, amount: int) -> None:
99+
pass
100+
101+
def cursor_down(self, amount: int) -> None:
102+
self._buffer.append("\n")
103+
104+
def cursor_forward(self, amount: int) -> None:
105+
self._buffer.append(" " * amount)
106+
107+
def cursor_backward(self, amount: int) -> None:
108+
pass
109+
110+
def hide_cursor(self) -> None:
111+
pass
112+
113+
def show_cursor(self) -> None:
114+
pass
115+
116+
def ask_for_cpr(self) -> None:
117+
pass
118+
119+
def bell(self) -> None:
120+
pass
121+
122+
def enable_bracketed_paste(self) -> None:
123+
pass
124+
125+
def disable_bracketed_paste(self) -> None:
126+
pass
127+
128+
def scroll_buffer_to_prompt(self) -> None:
129+
pass
130+
131+
def get_size(self) -> Size:
132+
return Size(rows=40, columns=80)
133+
134+
def get_rows_below_cursor_position(self) -> int:
135+
return 8
136+
137+
def get_default_color_depth(self) -> ColorDepth:
138+
return ColorDepth.DEPTH_1_BIT

prompt_toolkit/output/vt100.py

Lines changed: 2 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@
77
http://pygments.org/
88
"""
99
import array
10-
import errno
1110
import io
1211
import os
1312
import sys
14-
from contextlib import contextmanager
1513
from typing import (
1614
IO,
1715
Callable,
@@ -34,6 +32,7 @@
3432
from prompt_toolkit.utils import is_dumb_terminal
3533

3634
from .color_depth import ColorDepth
35+
from .flush_stdout import flush_stdout
3736

3837
__all__ = [
3938
"Vt100_Output",
@@ -673,46 +672,7 @@ def flush(self) -> None:
673672
data = "".join(self._buffer)
674673
self._buffer = []
675674

676-
try:
677-
# Ensure that `self.stdout` is made blocking when writing into it.
678-
# Otherwise, when uvloop is activated (which makes stdout
679-
# non-blocking), and we write big amounts of text, then we get a
680-
# `BlockingIOError` here.
681-
with blocking_io(self.stdout):
682-
# (We try to encode ourself, because that way we can replace
683-
# characters that don't exist in the character set, avoiding
684-
# UnicodeEncodeError crashes. E.g. u'\xb7' does not appear in 'ascii'.)
685-
# My Arch Linux installation of july 2015 reported 'ANSI_X3.4-1968'
686-
# for sys.stdout.encoding in xterm.
687-
out: IO[bytes]
688-
if self.write_binary:
689-
if hasattr(self.stdout, "buffer"):
690-
out = self.stdout.buffer
691-
else:
692-
# IO[bytes] was given to begin with.
693-
# (Used in the unit tests, for instance.)
694-
out = cast(IO[bytes], self.stdout)
695-
out.write(data.encode(self.stdout.encoding or "utf-8", "replace"))
696-
else:
697-
self.stdout.write(data)
698-
699-
self.stdout.flush()
700-
except IOError as e:
701-
if e.args and e.args[0] == errno.EINTR:
702-
# Interrupted system call. Can happen in case of a window
703-
# resize signal. (Just ignore. The resize handler will render
704-
# again anyway.)
705-
pass
706-
elif e.args and e.args[0] == 0:
707-
# This can happen when there is a lot of output and the user
708-
# sends a KeyboardInterrupt by pressing Control-C. E.g. in
709-
# a Python REPL when we execute "while True: print('test')".
710-
# (The `ptpython` REPL uses this `Output` class instead of
711-
# `stdout` directly -- in order to be network transparent.)
712-
# So, just ignore.
713-
pass
714-
else:
715-
raise
675+
flush_stdout(self.stdout, data, write_binary=self.write_binary)
716676

717677
def ask_for_cpr(self) -> None:
718678
"""
@@ -764,37 +724,3 @@ def get_default_color_depth(self) -> ColorDepth:
764724
return ColorDepth.DEPTH_4_BIT
765725

766726
return ColorDepth.DEFAULT
767-
768-
769-
@contextmanager
770-
def blocking_io(io: IO[str]) -> Iterator[None]:
771-
"""
772-
Ensure that the FD for `io` is set to blocking in here.
773-
"""
774-
if sys.platform == "win32":
775-
# On Windows, the `os` module doesn't have a `get/set_blocking`
776-
# function.
777-
yield
778-
return
779-
780-
try:
781-
fd = io.fileno()
782-
blocking = os.get_blocking(fd)
783-
except: # noqa
784-
# Failed somewhere.
785-
# `get_blocking` can raise `OSError`.
786-
# The io object can raise `AttributeError` when no `fileno()` method is
787-
# present if we're not a real file object.
788-
blocking = True # Assume we're good, and don't do anything.
789-
790-
try:
791-
# Make blocking if we weren't blocking yet.
792-
if not blocking:
793-
os.set_blocking(fd, True)
794-
795-
yield
796-
797-
finally:
798-
# Restore original blocking mode.
799-
if not blocking:
800-
os.set_blocking(fd, blocking)

0 commit comments

Comments
 (0)