From 603599728278585ebcfac81ec1c82b9fe59917fc Mon Sep 17 00:00:00 2001 From: Jonathan Slenders Date: Tue, 19 Jan 2021 18:22:12 +0100 Subject: [PATCH] Refactoring of history code. - Fix race condition in ThreadedHistory. Fixes issue #1158. - Refactored the history code so that asynchronous history loader implementations become possible. - Start loading the history when the `BufferControl` renders for the first time. This way we are sure that the history loading uses the same event loop as the one used by the application. (Important for tools like ptpython where the `Application` can run in a different thread.) - Added unit tests. - Make it possible to wrap `InMemoryHistory` in `ThreadedHistory` (not that useful, but good if all combinations work). - Use a deque for the working lines in buffer.py -> much better performance when reloading the history. Special thanks to "Bob Hyman " for the original implementations of an improved history and script for reproducing issue #1158. --- prompt_toolkit/buffer.py | 69 +++++++++--- prompt_toolkit/history.py | 168 ++++++++++++++++++++---------- prompt_toolkit/layout/controls.py | 8 ++ tests/test_history.py | 101 ++++++++++++++++++ 4 files changed, 276 insertions(+), 70 deletions(-) create mode 100644 tests/test_history.py diff --git a/prompt_toolkit/buffer.py b/prompt_toolkit/buffer.py index dbeeb7d1d..7399bf950 100644 --- a/prompt_toolkit/buffer.py +++ b/prompt_toolkit/buffer.py @@ -3,18 +3,21 @@ It holds the text, cursor position, history, etc... """ import asyncio +import logging import os import re import shlex import shutil import subprocess import tempfile +from collections import deque from enum import Enum from functools import wraps from typing import ( Any, Awaitable, Callable, + Deque, Iterable, List, Optional, @@ -54,6 +57,8 @@ "reshape_text", ] +logger = logging.getLogger(__name__) + class EditReadOnlyBuffer(Exception): " Attempt editing of read-only :class:`.Buffer`. " @@ -300,21 +305,12 @@ def __init__( self._async_completer = self._create_completer_coroutine() self._async_validator = self._create_auto_validate_coroutine() + # Asyncio task for populating the history. + self._load_history_task: Optional[asyncio.Future[None]] = None + # Reset other attributes. self.reset(document=document) - # Load the history. - def new_history_item(item: str) -> None: - # XXX: Keep in mind that this function can be called in a different - # thread! - # Insert the new string into `_working_lines`. - self._working_lines.insert(0, item) - self.__working_index += ( - 1 # Not entirely threadsafe, but probably good enough. - ) - - self.history.load(new_history_item) - def __repr__(self) -> str: if len(self.text) < 15: text = self.text @@ -373,14 +369,57 @@ def reset( self._undo_stack: List[Tuple[str, int]] = [] self._redo_stack: List[Tuple[str, int]] = [] + # Cancel history loader. If history loading was still ongoing. + # Cancel the `_load_history_task`, so that next repaint of the + # `BufferControl` we will repopulate it. + if self._load_history_task is not None: + self._load_history_task.cancel() + self._load_history_task = None + #: The working lines. Similar to history, except that this can be #: modified. The user can press arrow_up and edit previous entries. #: Ctrl-C should reset this, and copy the whole history back in here. #: Enter should process the current command and append to the real #: history. - self._working_lines = self.history.get_strings()[:] - self._working_lines.append(document.text) - self.__working_index = len(self._working_lines) - 1 + self._working_lines: Deque[str] = deque([document.text]) + self.__working_index = 0 + + def load_history_if_not_yet_loaded(self) -> None: + """ + Create task for populating the buffer history (if not yet done). + + Note:: + + This needs to be called from within the event loop of the + application, because history loading is async, and we need to be + sure the right event loop is active. Therefor, we call this method + in the `BufferControl.create_content`. + + There are situations where prompt_toolkit applications are created + in one thread, but will later run in a different thread (Ptpython + is one example. The REPL runs in a separate thread, in order to + prevent interfering with a potential different event loop in the + main thread. The REPL UI however is still created in the main + thread.) We could decide to not support creating prompt_toolkit + objects in one thread and running the application in a different + thread, but history loading is the only place where it matters, and + this solves it. + """ + if self._load_history_task is None: + + async def load_history() -> None: + try: + async for item in self.history.load(): + self._working_lines.appendleft(item) + self.__working_index += 1 + except asyncio.CancelledError: + pass + except BaseException: + # Log error if something goes wrong. (We don't have a + # caller to which we can propagate this exception.) + logger.exception("Loading history failed") + + self._load_history_task = asyncio.ensure_future(load_history()) # diff --git a/prompt_toolkit/history.py b/prompt_toolkit/history.py index e59d6a33e..6416c809a 100644 --- a/prompt_toolkit/history.py +++ b/prompt_toolkit/history.py @@ -7,11 +7,12 @@ loading can be done asynchronously and making the history swappable would probably break this. """ +import asyncio import datetime import os +import threading from abc import ABCMeta, abstractmethod -from threading import Thread -from typing import Callable, Iterable, List, Optional +from typing import AsyncGenerator, Iterable, List, Optional, Sequence __all__ = [ "History", @@ -32,57 +33,43 @@ class History(metaclass=ABCMeta): def __init__(self) -> None: # In memory storage for strings. self._loaded = False + + # History that's loaded already, in reverse order. Latest, most recent + # item first. self._loaded_strings: List[str] = [] # # Methods expected by `Buffer`. # - def load(self, item_loaded_callback: Callable[[str], None]) -> None: + async def load(self) -> AsyncGenerator[str, None]: """ - Load the history and call the callback for every entry in the history. - - XXX: The callback can be called from another thread, which happens in - case of `ThreadedHistory`. - - We can't assume that an asyncio event loop is running, and - schedule the insertion into the `Buffer` using the event loop. - - The reason is that the creation of the :class:`.History` object as - well as the start of the loading happens *before* - `Application.run()` is called, and it can continue even after - `Application.run()` terminates. (Which is useful to have a - complete history during the next prompt.) - - Calling `get_event_loop()` right here is also not guaranteed to - return the same event loop which is used in `Application.run`, - because a new event loop can be created during the `run`. This is - useful in Python REPLs, where we want to use one event loop for - the prompt, and have another one active during the `eval` of the - commands. (Otherwise, the user can schedule a while/true loop and - freeze the UI.) + Load the history and yield all the entries in reverse order (latest, + most recent history entry first). + + This method can be called multiple times from the `Buffer` to + repopulate the history when prompting for a new input. So we are + responsible here for both caching, and making sure that strings that + were were appended to the history will be incorporated next time this + method is called. """ - if self._loaded: - for item in self._loaded_strings[::-1]: - item_loaded_callback(item) - return - - try: - for item in self.load_history_strings(): - self._loaded_strings.insert(0, item) - item_loaded_callback(item) - finally: + if not self._loaded: + self._loaded_strings = list(self.load_history_strings()) self._loaded = True + for item in self._loaded_strings: + yield item + def get_strings(self) -> List[str]: """ Get the strings from the history that are loaded so far. + (In order. Oldest item first.) """ - return self._loaded_strings + return self._loaded_strings[::-1] def append_string(self, string: str) -> None: " Add string to the history. " - self._loaded_strings.append(string) + self._loaded_strings.insert(0, string) self.store_string(string) # @@ -110,7 +97,8 @@ def store_string(self, string: str) -> None: class ThreadedHistory(History): """ - Wrapper that runs the `load_history_strings` generator in a thread. + Wrapper around `History` implementations that run the `load()` generator in + a thread. Use this to increase the start-up time of prompt_toolkit applications. History entries are available as soon as they are loaded. We don't have to @@ -118,32 +106,90 @@ class ThreadedHistory(History): """ def __init__(self, history: History) -> None: - self.history = history - self._load_thread: Optional[Thread] = None - self._item_loaded_callbacks: List[Callable[[str], None]] = [] super().__init__() - def load(self, item_loaded_callback: Callable[[str], None]) -> None: - self._item_loaded_callbacks.append(item_loaded_callback) + self.history = history - # Start the load thread, if we don't have a thread yet. - if not self._load_thread: + self._load_thread: Optional[threading.Thread] = None - def call_all_callbacks(item: str) -> None: - for cb in self._item_loaded_callbacks: - cb(item) + # Lock for accessing/manipulating `_loaded_strings` and `_loaded` + # together in a consistent state. + self._lock = threading.Lock() - self._load_thread = Thread( - target=self.history.load, args=(call_all_callbacks,) + # Events created by each `load()` call. Used to wait for new history + # entries from the loader thread. + self._string_load_events: List[threading.Event] = [] + + async def load(self) -> AsyncGenerator[str, None]: + """ + Like `History.load(), but call `self.load_history_strings()` in a + background thread. + """ + # Start the load thread, if this is called for the first time. + if not self._load_thread: + self._load_thread = threading.Thread( + target=self._in_load_thread, + daemon=True, ) - self._load_thread.daemon = True self._load_thread.start() - def get_strings(self) -> List[str]: - return self.history.get_strings() + # Consume the `_loaded_strings` list, using asyncio. + loop = asyncio.get_event_loop() + + # Create threading Event so that we can wait for new items. + event = threading.Event() + event.set() + self._string_load_events.append(event) + + items_yielded = 0 + + try: + while True: + # Wait for new items to be available. + await loop.run_in_executor(None, event.wait) + + # Read new items (in lock). + await loop.run_in_executor(None, self._lock.acquire) + try: + new_items = self._loaded_strings[items_yielded:] + done = self._loaded + event.clear() + finally: + self._lock.release() + + items_yielded += len(new_items) + + for item in new_items: + yield item + + if done: + break + finally: + self._string_load_events.remove(event) + + def _in_load_thread(self) -> None: + try: + # Start with an empty list. In case `append_string()` was called + # before `load()` happened. Then `.store_string()` will have + # written these entries back to disk and we will reload it. + self._loaded_strings = [] + + for item in self.history.load_history_strings(): + with self._lock: + self._loaded_strings.append(item) + + for event in self._string_load_events: + event.set() + finally: + with self._lock: + self._loaded = True + for event in self._string_load_events: + event.set() def append_string(self, string: str) -> None: - self.history.append_string(string) + with self._lock: + self._loaded_strings.insert(0, string) + self.store_string(string) # All of the following are proxied to `self.history`. @@ -160,13 +206,25 @@ def __repr__(self) -> str: class InMemoryHistory(History): """ :class:`.History` class that keeps a list of all strings in memory. + + In order to prepopulate the history, it's possible to call either + `append_string` for all items or pass a list of strings to `__init__` here. """ + def __init__(self, history_strings: Optional[Sequence[str]] = None) -> None: + super().__init__() + # Emulating disk storage. + if history_strings is None: + self._storage = [] + else: + self._storage = list(history_strings) + def load_history_strings(self) -> Iterable[str]: - return [] + for item in self._storage[::-1]: + yield item def store_string(self, string: str) -> None: - pass + self._storage.append(string) class DummyHistory(History): diff --git a/prompt_toolkit/layout/controls.py b/prompt_toolkit/layout/controls.py index f2c4a213b..2f02eb62e 100644 --- a/prompt_toolkit/layout/controls.py +++ b/prompt_toolkit/layout/controls.py @@ -737,6 +737,14 @@ def create_content( """ buffer = self.buffer + # Trigger history loading of the buffer. We do this during the + # rendering of the UI here, because it needs to happen when an + # `Application` with its event loop is running. During the rendering of + # the buffer control is the earliest place we can achieve this, where + # we're sure the right event loop is active, and don't require user + # interaction (like in a key binding). + buffer.load_history_if_not_yet_loaded() + # Get the document to be shown. If we are currently searching (the # search buffer has focus, and the preview_search filter is enabled), # then use the search document, which has possibly a different diff --git a/tests/test_history.py b/tests/test_history.py new file mode 100644 index 000000000..52f28b0ce --- /dev/null +++ b/tests/test_history.py @@ -0,0 +1,101 @@ +import asyncio + +from prompt_toolkit.history import FileHistory, InMemoryHistory, ThreadedHistory + + +def _call_history_load(history): + """ + Helper: Call the history "load" method and return the result as a list of strings. + """ + result = [] + + async def call_load(): + async for item in history.load(): + result.append(item) + + asyncio.get_event_loop().run_until_complete(call_load()) + return result + + +def test_in_memory_history(): + history = InMemoryHistory() + history.append_string("hello") + history.append_string("world") + + # Newest should yield first. + assert _call_history_load(history) == ["world", "hello"] + + # Test another call. + assert _call_history_load(history) == ["world", "hello"] + + history.append_string("test3") + assert _call_history_load(history) == ["test3", "world", "hello"] + + # Passing history as a parameter. + history2 = InMemoryHistory(["abc", "def"]) + assert _call_history_load(history2) == ["def", "abc"] + + +def test_file_history(tmpdir): + histfile = tmpdir.join("history") + + history = FileHistory(histfile) + + history.append_string("hello") + history.append_string("world") + + # Newest should yield first. + assert _call_history_load(history) == ["world", "hello"] + + # Test another call. + assert _call_history_load(history) == ["world", "hello"] + + history.append_string("test3") + assert _call_history_load(history) == ["test3", "world", "hello"] + + # Create another history instance pointing to the same file. + history2 = FileHistory(histfile) + assert _call_history_load(history2) == ["test3", "world", "hello"] + + +def test_threaded_file_history(tmpdir): + histfile = tmpdir.join("history") + + history = ThreadedHistory(FileHistory(histfile)) + + history.append_string("hello") + history.append_string("world") + + # Newest should yield first. + assert _call_history_load(history) == ["world", "hello"] + + # Test another call. + assert _call_history_load(history) == ["world", "hello"] + + history.append_string("test3") + assert _call_history_load(history) == ["test3", "world", "hello"] + + # Create another history instance pointing to the same file. + history2 = ThreadedHistory(FileHistory(histfile)) + assert _call_history_load(history2) == ["test3", "world", "hello"] + + +def test_threaded_in_memory_history(): + # Threaded in memory history is not useful. But testing it anyway, just to + # see whether everything plays nicely together. + history = ThreadedHistory(InMemoryHistory()) + history.append_string("hello") + history.append_string("world") + + # Newest should yield first. + assert _call_history_load(history) == ["world", "hello"] + + # Test another call. + assert _call_history_load(history) == ["world", "hello"] + + history.append_string("test3") + assert _call_history_load(history) == ["test3", "world", "hello"] + + # Passing history as a parameter. + history2 = ThreadedHistory(InMemoryHistory(["abc", "def"])) + assert _call_history_load(history2) == ["def", "abc"]