diff --git a/.gitignore b/.gitignore index 0f4ebc230..7a1916336 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,6 @@ docs/_build # pycharm metadata .idea + +# VS Code +.vscode/ diff --git a/examples/prompts/history/bug_thread_history.py b/examples/prompts/history/bug_thread_history.py new file mode 100755 index 000000000..70685e4b2 --- /dev/null +++ b/examples/prompts/history/bug_thread_history.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python +""" +Demonstrate bug in threaded history, where asynchronous loading can corrupt Buffer context. + +Seems to happen with very large history being loaded and causing slowdowns. + +""" +import re +import time + +from prompt_toolkit import PromptSession +from prompt_toolkit.history import History, ThreadedHistory + + +class MegaHistory(History): + """ + Example class that loads lots of history + + Sample designed to exercise existing multitasking hazards, don't add any more. + """ + + def __init__(self, init_request: int = 1000, *args, **kwargs): + super(MegaHistory, self).__init__(*args, **kwargs) + self.requested_count = 0 # only modified by main (requesting) thread + self.requested_commands = 0 # only modified by main (requesting) thread + self.actual_count = 0 # only modified by background thread + + def load_history_strings(self): + while True: + while self.requested_count <= self.actual_count: + time.sleep(0.001) # don't busy loop + + print( + f"... starting to load {self.requested_count - self.actual_count:15,d} more items." + ) + while self.requested_count > self.actual_count: + yield f"History item {self.actual_count:15,d}, command number {self.requested_commands}" + self.actual_count += 1 + print("...done.") + + def store_string(self, string): + pass # Don't store strings. + + # called by main thread, watch out for multitasking hazards. + def add_request(self, requested: int = 0): + self.requested_count += requested + self.requested_commands += 1 + + def show_request(self): + print( + f"Have loaded {self.actual_count:15,d} of {self.requested_count:15,d} in {self.requested_commands} commands." + ) + + +HIST_CMD = re.compile(r"^hist (load (\d+)|show)$", re.IGNORECASE) + + +def main(): + print( + "Asynchronous loading of history. Designed to exercise multitasking hazard in Buffer.\n" + "When started, tries to load 100,000 lines into history with no delay.\n" + "Expect to trigger assertion in document.py line 98, though others may fire.\n" + "\n" + "Can request more lines by `hist load nnnnn`, and can see progress by `hist show`.\n" + "\n" + ) + mh = MegaHistory() + our_history = ThreadedHistory(mh) + + # The history needs to be passed to the `PromptSession`. It can't be passed + # to the `prompt` call because only one history can be used during a + # session. + session = PromptSession(history=our_history) + + mh.add_request(99999) + + while True: + text = session.prompt("Say something: ") + if text.startswith("hist"): + m = HIST_CMD.match(text) + if not m: + print("eh?") + else: + if m[1] == "show": + mh.show_request() + elif m[1].startswith("load"): + mh.add_request(int(m[2])) + else: + print("eh? hist load nnnnnn\nor hist show") + pass + else: + print("You said: %s" % text) + + +if __name__ == "__main__": + main() diff --git a/prompt_toolkit/buffer.py b/prompt_toolkit/buffer.py index 6fea94014..49b7b152d 100644 --- a/prompt_toolkit/buffer.py +++ b/prompt_toolkit/buffer.py @@ -9,6 +9,7 @@ import shutil import subprocess import tempfile +import threading from enum import Enum from functools import wraps from typing import ( @@ -305,13 +306,13 @@ def __init__( # Load the history. def new_history_item(item: str) -> None: - # XXX: Keep in mind that this function can be called in a different - # thread! # Insert the new string into `_working_lines`. + # XXX: This function contains a critical section, may only + # be invoked on the event loop thread if history is + # loading on another thread. + self._working_lines.insert(0, item) - self.__working_index += ( - 1 # Not entirely threadsafe, but probably good enough. - ) + self.__working_index += 1 self.history.load(new_history_item) @@ -413,6 +414,10 @@ def _set_cursor_position(self, value: int) -> bool: @property def text(self) -> str: + if self.working_index >= len(self._working_lines): + print( + f"Buffer: working_index {self.working_index} out of sync with working_lines {len(self._working_lines)}" + ) return self._working_lines[self.working_index] @text.setter diff --git a/prompt_toolkit/history.py b/prompt_toolkit/history.py index 72acec9dc..cf5abd6a1 100644 --- a/prompt_toolkit/history.py +++ b/prompt_toolkit/history.py @@ -6,8 +6,10 @@ when a history entry is loaded. This loading can be done asynchronously and making the history swappable would probably break this. """ +import asyncio import datetime import os +import time from abc import ABCMeta, abstractmethod from threading import Thread from typing import Callable, Iterable, List, Optional @@ -37,29 +39,12 @@ def __init__(self) -> None: # Methods expected by `Buffer`. # - def load(self, item_loaded_callback: Callable[[str], None]) -> None: + def load(self, item_loaded_callback: Callable[[str], None],) -> None: """ Load the history and call the callback for every entry in the history. + This one assumes the callback is only called from same thread as `Buffer` is using. - XXX: The callback can be called from another thread, which happens in - case of `ThreadedHistory`. - - We can't assume that an asyncio event loop is running, and - schedule the insertion into the `Buffer` using the event loop. - - The reason is that the creation of the :class:`.History` object as - well as the start of the loading happens *before* - `Application.run()` is called, and it can continue even after - `Application.run()` terminates. (Which is useful to have a - complete history during the next prompt.) - - Calling `get_event_loop()` right here is also not guaranteed to - return the same event loop which is used in `Application.run`, - because a new event loop can be created during the `run`. This is - useful in Python REPLs, where we want to use one event loop for - the prompt, and have another one active during the `eval` of the - commands. (Otherwise, the user can schedule a while/true loop and - freeze the UI.) + See `ThreadedHistory` for another way. """ if self._loaded: for item in self._loaded_strings[::-1]: @@ -123,26 +108,59 @@ def __init__(self, history: History) -> None: super().__init__() def load(self, item_loaded_callback: Callable[[str], None]) -> None: + + """Collect the history strings on a background thread, + but run the callback in the event loop. + + Caller of ThreadedHistory must ensure that the Application ends up running on the same + event loop as we (probably) create here. + """ + self._item_loaded_callbacks.append(item_loaded_callback) + def call_all_callbacks(item: str) -> None: + for cb in self._item_loaded_callbacks: + cb(item) + + if self._loaded: # ugly reference to base class internal... + for item in self._loaded_strings[::-1]: + call_all_callbacks(item) + return + # Start the load thread, if we don't have a thread yet. if not self._load_thread: - def call_all_callbacks(item: str) -> None: - for cb in self._item_loaded_callbacks: - cb(item) + event_loop = asyncio.get_event_loop() self._load_thread = Thread( - target=self.history.load, args=(call_all_callbacks,) + target=self.bg_loader, args=(call_all_callbacks, event_loop) ) self._load_thread.daemon = True self._load_thread.start() - def get_strings(self) -> List[str]: - return self.history.get_strings() + def bg_loader( + self, + item_loaded_callback: Callable[[str], None], + event_loop: asyncio.BaseEventLoop, + ) -> None: + """ + Load the history and schedule the callback for every entry in the history. + TODO: extend the callback so it can take a batch of lines in one event_loop dispatch. + """ - def append_string(self, string: str) -> None: - self.history.append_string(string) + try: + for item in self.load_history_strings(): + self._loaded_strings.insert( + 0, item + ) # slowest way to add an element to a list known to man. + event_loop.call_soon_threadsafe( + item_loaded_callback, item + ) # expensive way to dispatch single line. + finally: + self._loaded = True + + def __repr__(self) -> str: + return "ThreadedHistory(%r)" % (self.history,) # All of the following are proxied to `self.history`. @@ -152,9 +170,6 @@ def load_history_strings(self) -> Iterable[str]: def store_string(self, string: str) -> None: self.history.store_string(string) - def __repr__(self) -> str: - return "ThreadedHistory(%r)" % (self.history,) - class InMemoryHistory(History): """