Skip to content

Use virtual terminal input on Windows when available. #1958

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 145 additions & 8 deletions src/prompt_toolkit/input/win32.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import msvcrt
from ctypes import windll

from ctypes import Array, pointer
from ctypes import Array, byref, pointer
from ctypes.wintypes import DWORD, HANDLE
from typing import Callable, ContextManager, Iterable, Iterator, TextIO

Expand All @@ -35,6 +35,7 @@

from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
from .base import Input
from .vt100_parser import Vt100Parser

__all__ = [
"Win32Input",
Expand All @@ -52,6 +53,9 @@
MOUSE_MOVED = 0x0001
MOUSE_WHEELED = 0x0004

# See: https://msdn.microsoft.com/pl-pl/library/windows/desktop/ms686033(v=vs.85).aspx
ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200


class _Win32InputBase(Input):
"""
Expand All @@ -74,7 +78,14 @@ class Win32Input(_Win32InputBase):

def __init__(self, stdin: TextIO | None = None) -> None:
super().__init__()
self.console_input_reader = ConsoleInputReader()
self._use_virtual_terminal_input = _is_win_vt100_input_enabled()

self.console_input_reader: Vt100ConsoleInputReader | ConsoleInputReader

if self._use_virtual_terminal_input:
self.console_input_reader = Vt100ConsoleInputReader()
else:
self.console_input_reader = ConsoleInputReader()

def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
"""
Expand All @@ -101,7 +112,9 @@ def closed(self) -> bool:
return False

def raw_mode(self) -> ContextManager[None]:
return raw_mode()
return raw_mode(
use_win10_virtual_terminal_input=self._use_virtual_terminal_input
)

def cooked_mode(self) -> ContextManager[None]:
return cooked_mode()
Expand Down Expand Up @@ -555,6 +568,102 @@ def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> list[KeyPress]:
return [KeyPress(Keys.WindowsMouseEvent, data)]


class Vt100ConsoleInputReader:
"""
Similar to `ConsoleInputReader`, but for usage when
`ENABLE_VIRTUAL_TERMINAL_INPUT` is enabled. This assumes that Windows sends
us the right vt100 escape sequences and we parse those with our vt100
parser.

(Using this instead of `ConsoleInputReader` results in the "data" attribute
from the `KeyPress` instances to be more correct in edge cases, because
this responds to for instance the terminal being in application cursor keys
mode.)
"""

def __init__(self) -> None:
self._fdcon = None

self._buffer: list[KeyPress] = [] # Buffer to collect the Key objects.
self._vt100_parser = Vt100Parser(
lambda key_press: self._buffer.append(key_press)
)

# When stdin is a tty, use that handle, otherwise, create a handle from
# CONIN$.
self.handle: HANDLE
if sys.stdin.isatty():
self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
else:
self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))

def close(self) -> None:
"Close fdcon."
if self._fdcon is not None:
os.close(self._fdcon)

def read(self) -> Iterable[KeyPress]:
"""
Return a list of `KeyPress` instances. It won't return anything when
there was nothing to read. (This function doesn't block.)

http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
"""
max_count = 2048 # Max events to read at the same time.

read = DWORD(0)
arrtype = INPUT_RECORD * max_count
input_records = arrtype()

# Check whether there is some input to read. `ReadConsoleInputW` would
# block otherwise.
# (Actually, the event loop is responsible to make sure that this
# function is only called when there is something to read, but for some
# reason this happened in the asyncio_win32 loop, and it's better to be
# safe anyway.)
if not wait_for_handles([self.handle], timeout=0):
return []

# Get next batch of input event.
windll.kernel32.ReadConsoleInputW(
self.handle, pointer(input_records), max_count, pointer(read)
)

# First, get all the keys from the input buffer, in order to determine
# whether we should consider this a paste event or not.
for key_data in self._get_keys(read, input_records):
self._vt100_parser.feed(key_data)

# Return result.
result = self._buffer
self._buffer = []
return result

def _get_keys(
self, read: DWORD, input_records: Array[INPUT_RECORD]
) -> Iterator[str]:
"""
Generator that yields `KeyPress` objects from the input records.
"""
for i in range(read.value):
ir = input_records[i]

# Get the right EventType from the EVENT_RECORD.
# (For some reason the Windows console application 'cmder'
# [http://gooseberrycreative.com/cmder/] can return '0' for
# ir.EventType. -- Just ignore that.)
if ir.EventType in EventTypes:
ev = getattr(ir.Event, EventTypes[ir.EventType])

# Process if this is a key event. (We also have mouse, menu and
# focus events.)
if isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown:
u_char = ev.uChar.UnicodeChar
if u_char != "\x00":
yield u_char


class _Win32Handles:
"""
Utility to keep track of which handles are connectod to which callbacks.
Expand Down Expand Up @@ -700,8 +809,11 @@ class raw_mode:
`raw_input` method of `.vt100_input`.
"""

def __init__(self, fileno: int | None = None) -> None:
def __init__(
self, fileno: int | None = None, use_win10_virtual_terminal_input: bool = False
) -> None:
self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
self.use_win10_virtual_terminal_input = use_win10_virtual_terminal_input

def __enter__(self) -> None:
# Remember original mode.
Expand All @@ -717,12 +829,15 @@ def _patch(self) -> None:
ENABLE_LINE_INPUT = 0x0002
ENABLE_PROCESSED_INPUT = 0x0001

windll.kernel32.SetConsoleMode(
self.handle,
self.original_mode.value
& ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
new_mode = self.original_mode.value & ~(
ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT
)

if self.use_win10_virtual_terminal_input:
new_mode |= ENABLE_VIRTUAL_TERMINAL_INPUT

windll.kernel32.SetConsoleMode(self.handle, new_mode)

def __exit__(self, *a: object) -> None:
# Restore original mode
windll.kernel32.SetConsoleMode(self.handle, self.original_mode)
Expand All @@ -747,3 +862,25 @@ def _patch(self) -> None:
self.original_mode.value
| (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
)


def _is_win_vt100_input_enabled() -> bool:
"""
Returns True when we're running Windows and VT100 escape sequences are
supported.
"""
hconsole = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))

# Get original console mode.
original_mode = DWORD(0)
windll.kernel32.GetConsoleMode(hconsole, byref(original_mode))

try:
# Try to enable VT100 sequences.
result: int = windll.kernel32.SetConsoleMode(
hconsole, DWORD(ENABLE_VIRTUAL_TERMINAL_INPUT)
)

return result == 1
finally:
windll.kernel32.SetConsoleMode(hconsole, original_mode)
Loading