From 0653ac492f63c397cef461bc92f9d6aa43e94078 Mon Sep 17 00:00:00 2001 From: Bob Hyman Date: Sat, 20 Jun 2020 15:42:46 -0400 Subject: [PATCH 1/3] ThreadedHistory: invoke history load callback on event loop --- .gitignore | 3 + .../prompts/history/bug_thread_history.py | 100 ++++++++++++++++++ prompt_toolkit/buffer.py | 15 ++- prompt_toolkit/history.py | 77 ++++++++------ 4 files changed, 159 insertions(+), 36 deletions(-) create mode 100755 examples/prompts/history/bug_thread_history.py diff --git a/.gitignore b/.gitignore index 0f4ebc230..7a1916336 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,6 @@ docs/_build # pycharm metadata .idea + +# VS Code +.vscode/ diff --git a/examples/prompts/history/bug_thread_history.py b/examples/prompts/history/bug_thread_history.py new file mode 100755 index 000000000..07fad8022 --- /dev/null +++ b/examples/prompts/history/bug_thread_history.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +""" +Demonstrate bug in threaded history, where asynchronous loading can corrupt Buffer context. + +Seems to happen with very large history being loaded and causing slowdowns. + +""" +import re +import time + +from prompt_toolkit import PromptSession +from prompt_toolkit.history import History, ThreadedHistory + + +class MegaHistory(History): + """ + Example class that loads lots of history + + Sample designed to exercise existing multitasking hazards, + be careful to not add any more in the sample. + """ + + def __init__(self, init_request: int = 1000, *args, **kwargs): + super(MegaHistory, self).__init__(*args, **kwargs) + self.requested_count = 0 # only modified by main (requesting) thread + self.requested_commands = 0 # only modified by main (requesting) thread + self.actual_count = 0 # only modified by background thread + + def load_history_strings(self): + while True: + while self.requested_count <= self.actual_count: + time.sleep(0.001) # don't busy loop + + print( + f"... starting to load {self.requested_count - self.actual_count:15,d} more items." + ) + while self.requested_count > self.actual_count: + yield f"History item {self.actual_count:15,d}, command number {self.requested_commands}" + self.actual_count += 1 + print("...done.") + + def store_string(self, string): + pass # Don't store strings. + + # called by main thread, watch out for multitasking hazards. + def add_request(self, requested: int = 0): + self.requested_count += requested + self.requested_commands += 1 + + def show_request(self): + print( + f"Have loaded {self.actual_count:15,d} of {self.requested_count:15,d} in {self.requested_commands} commands." + ) + + +HIST_CMD = re.compile(r"^hist (load (\d+)|show)$", re.IGNORECASE) + + +def main(): + print( + "Asynchronous loading of history. Designed to exercise multitasking hazard in Buffer.\n" + "When started, tries to load 100,000 lines into history with no delay.\n" + "Expect to trigger assertion in document.py line 98, though others may fire.\n" + "\n" + "Can request more lines by `hist load nnnnn`, and can see progress by `hist show`.\n" + "\n" + "Notice that the up-arrow will work for the completions that are currently loaded.\n" + "Even after the input is accepted, loading will continue in the background and\n" + "the additional history will be available when the next prompt is displayed.\n" + ) + mh = MegaHistory() + our_history = ThreadedHistory(mh) + + # The history needs to be passed to the `PromptSession`. It can't be passed + # to the `prompt` call because only one history can be used during a + # session. + session = PromptSession(history=our_history) + + mh.add_request(99999) + + while True: + text = session.prompt("Say something: ") + if text.startswith("hist"): + m = HIST_CMD.match(text) + if not m: + print("eh?") + else: + if m[1] == "show": + mh.show_request() + elif m[1].startswith("load"): + mh.add_request(int(m[2])) + else: + print("eh? hist load nnnnnn\nor hist show") + pass + else: + print("You said: %s" % text) + + +if __name__ == "__main__": + main() diff --git a/prompt_toolkit/buffer.py b/prompt_toolkit/buffer.py index 6fea94014..49b7b152d 100644 --- a/prompt_toolkit/buffer.py +++ b/prompt_toolkit/buffer.py @@ -9,6 +9,7 @@ import shutil import subprocess import tempfile +import threading from enum import Enum from functools import wraps from typing import ( @@ -305,13 +306,13 @@ def __init__( # Load the history. def new_history_item(item: str) -> None: - # XXX: Keep in mind that this function can be called in a different - # thread! # Insert the new string into `_working_lines`. + # XXX: This function contains a critical section, may only + # be invoked on the event loop thread if history is + # loading on another thread. + self._working_lines.insert(0, item) - self.__working_index += ( - 1 # Not entirely threadsafe, but probably good enough. - ) + self.__working_index += 1 self.history.load(new_history_item) @@ -413,6 +414,10 @@ def _set_cursor_position(self, value: int) -> bool: @property def text(self) -> str: + if self.working_index >= len(self._working_lines): + print( + f"Buffer: working_index {self.working_index} out of sync with working_lines {len(self._working_lines)}" + ) return self._working_lines[self.working_index] @text.setter diff --git a/prompt_toolkit/history.py b/prompt_toolkit/history.py index 72acec9dc..cf5abd6a1 100644 --- a/prompt_toolkit/history.py +++ b/prompt_toolkit/history.py @@ -6,8 +6,10 @@ when a history entry is loaded. This loading can be done asynchronously and making the history swappable would probably break this. """ +import asyncio import datetime import os +import time from abc import ABCMeta, abstractmethod from threading import Thread from typing import Callable, Iterable, List, Optional @@ -37,29 +39,12 @@ def __init__(self) -> None: # Methods expected by `Buffer`. # - def load(self, item_loaded_callback: Callable[[str], None]) -> None: + def load(self, item_loaded_callback: Callable[[str], None],) -> None: """ Load the history and call the callback for every entry in the history. + This one assumes the callback is only called from same thread as `Buffer` is using. - XXX: The callback can be called from another thread, which happens in - case of `ThreadedHistory`. - - We can't assume that an asyncio event loop is running, and - schedule the insertion into the `Buffer` using the event loop. - - The reason is that the creation of the :class:`.History` object as - well as the start of the loading happens *before* - `Application.run()` is called, and it can continue even after - `Application.run()` terminates. (Which is useful to have a - complete history during the next prompt.) - - Calling `get_event_loop()` right here is also not guaranteed to - return the same event loop which is used in `Application.run`, - because a new event loop can be created during the `run`. This is - useful in Python REPLs, where we want to use one event loop for - the prompt, and have another one active during the `eval` of the - commands. (Otherwise, the user can schedule a while/true loop and - freeze the UI.) + See `ThreadedHistory` for another way. """ if self._loaded: for item in self._loaded_strings[::-1]: @@ -123,26 +108,59 @@ def __init__(self, history: History) -> None: super().__init__() def load(self, item_loaded_callback: Callable[[str], None]) -> None: + + """Collect the history strings on a background thread, + but run the callback in the event loop. + + Caller of ThreadedHistory must ensure that the Application ends up running on the same + event loop as we (probably) create here. + """ + self._item_loaded_callbacks.append(item_loaded_callback) + def call_all_callbacks(item: str) -> None: + for cb in self._item_loaded_callbacks: + cb(item) + + if self._loaded: # ugly reference to base class internal... + for item in self._loaded_strings[::-1]: + call_all_callbacks(item) + return + # Start the load thread, if we don't have a thread yet. if not self._load_thread: - def call_all_callbacks(item: str) -> None: - for cb in self._item_loaded_callbacks: - cb(item) + event_loop = asyncio.get_event_loop() self._load_thread = Thread( - target=self.history.load, args=(call_all_callbacks,) + target=self.bg_loader, args=(call_all_callbacks, event_loop) ) self._load_thread.daemon = True self._load_thread.start() - def get_strings(self) -> List[str]: - return self.history.get_strings() + def bg_loader( + self, + item_loaded_callback: Callable[[str], None], + event_loop: asyncio.BaseEventLoop, + ) -> None: + """ + Load the history and schedule the callback for every entry in the history. + TODO: extend the callback so it can take a batch of lines in one event_loop dispatch. + """ - def append_string(self, string: str) -> None: - self.history.append_string(string) + try: + for item in self.load_history_strings(): + self._loaded_strings.insert( + 0, item + ) # slowest way to add an element to a list known to man. + event_loop.call_soon_threadsafe( + item_loaded_callback, item + ) # expensive way to dispatch single line. + finally: + self._loaded = True + + def __repr__(self) -> str: + return "ThreadedHistory(%r)" % (self.history,) # All of the following are proxied to `self.history`. @@ -152,9 +170,6 @@ def load_history_strings(self) -> Iterable[str]: def store_string(self, string: str) -> None: self.history.store_string(string) - def __repr__(self) -> str: - return "ThreadedHistory(%r)" % (self.history,) - class InMemoryHistory(History): """ From 7a8956306c3e750e4b601a561a14611627bc5600 Mon Sep 17 00:00:00 2001 From: Bob Hyman Date: Mon, 21 Sep 2020 23:50:29 -0400 Subject: [PATCH 2/3] API for ThreadedHistory to use a specified event loop. --- prompt_toolkit/history.py | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/prompt_toolkit/history.py b/prompt_toolkit/history.py index cf5abd6a1..ff813441e 100644 --- a/prompt_toolkit/history.py +++ b/prompt_toolkit/history.py @@ -13,6 +13,7 @@ from abc import ABCMeta, abstractmethod from threading import Thread from typing import Callable, Iterable, List, Optional +import warnings __all__ = [ "History", @@ -101,19 +102,33 @@ class ThreadedHistory(History): wait for everything to be loaded. """ - def __init__(self, history: History) -> None: + def __init__( + self, history: History, event_loop: asyncio.BaseEventLoop = None + ) -> None: + """Create instance of ThreadedHistory + + Args: + history (History): Instance of History intended to run on a background thread. + event_loop (asyncio.BaseEventLoop, optional): The event loop on which prompt toolkit is running. + (Deprecated) Defaults to ``asyncio.get_event_loop(), which may *create* the event loop. Caller should provide an explicit value. + """ self.history = history self._load_thread: Optional[Thread] = None self._item_loaded_callbacks: List[Callable[[str], None]] = [] + if event_loop is None: + warnings.warn( + "Event_loop argument should be explicitly provided by caller so history callback " + "uses the same loop as rest of prompt-toolkit. Will use default event loop for now.", + DeprecationWarning, + ) + event_loop = asyncio.get_event_loop() + self.event_loop = event_loop super().__init__() def load(self, item_loaded_callback: Callable[[str], None]) -> None: """Collect the history strings on a background thread, - but run the callback in the event loop. - - Caller of ThreadedHistory must ensure that the Application ends up running on the same - event loop as we (probably) create here. + but run the callback which provides them to a buffer in the event loop. """ self._item_loaded_callbacks.append(item_loaded_callback) @@ -130,19 +145,13 @@ def call_all_callbacks(item: str) -> None: # Start the load thread, if we don't have a thread yet. if not self._load_thread: - event_loop = asyncio.get_event_loop() - self._load_thread = Thread( - target=self.bg_loader, args=(call_all_callbacks, event_loop) + target=self.bg_loader, args=(call_all_callbacks,) ) self._load_thread.daemon = True self._load_thread.start() - def bg_loader( - self, - item_loaded_callback: Callable[[str], None], - event_loop: asyncio.BaseEventLoop, - ) -> None: + def bg_loader(self, item_loaded_callback: Callable[[str], None],) -> None: """ Load the history and schedule the callback for every entry in the history. TODO: extend the callback so it can take a batch of lines in one event_loop dispatch. @@ -153,7 +162,7 @@ def bg_loader( self._loaded_strings.insert( 0, item ) # slowest way to add an element to a list known to man. - event_loop.call_soon_threadsafe( + self.event_loop.call_soon_threadsafe( item_loaded_callback, item ) # expensive way to dispatch single line. finally: From 546b5787ebb2317a6866c5cdd4af59f99b72ea70 Mon Sep 17 00:00:00 2001 From: Bob Hyman Date: Tue, 22 Sep 2020 00:31:41 -0400 Subject: [PATCH 3/3] Modify ThreadedHistory example(s) for new API --- examples/prompts/history/slow-history.py | 14 +++++++++++++- prompt_toolkit/history.py | 16 +++++++++++----- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/examples/prompts/history/slow-history.py b/examples/prompts/history/slow-history.py index 1ba3860d6..352b21d5a 100755 --- a/examples/prompts/history/slow-history.py +++ b/examples/prompts/history/slow-history.py @@ -5,6 +5,7 @@ By wrapping it in `ThreadedHistory`, the history will load in the background without blocking any user interaction. """ +import asyncio import time from prompt_toolkit import PromptSession @@ -32,11 +33,22 @@ def main(): "Even when the input is accepted, loading will continue in the " "background and when the next prompt is displayed.\n" ) - our_history = ThreadedHistory(SlowHistory()) + + my_loop = asyncio.get_event_loop() # creates loop if needed + + # Inform ThreadedHistory which event loop to use + # when passing lines of history to the prompt. + + our_history = ThreadedHistory(SlowHistory(), my_loop) # The history needs to be passed to the `PromptSession`. It can't be passed # to the `prompt` call because only one history can be used during a # session. + # Note that PromptSession runs on the thread's current event loop because + # it was created above and is therefore in synch with ThreadedHistory. + # PromptSession would create and event loop if it didn't find one + # already running, but then ThreadedHistory would not work. + session = PromptSession(history=our_history) while True: diff --git a/prompt_toolkit/history.py b/prompt_toolkit/history.py index ff813441e..1af87c293 100644 --- a/prompt_toolkit/history.py +++ b/prompt_toolkit/history.py @@ -10,10 +10,10 @@ import datetime import os import time +import warnings from abc import ABCMeta, abstractmethod from threading import Thread from typing import Callable, Iterable, List, Optional -import warnings __all__ = [ "History", @@ -40,7 +40,10 @@ def __init__(self) -> None: # Methods expected by `Buffer`. # - def load(self, item_loaded_callback: Callable[[str], None],) -> None: + def load( + self, + item_loaded_callback: Callable[[str], None], + ) -> None: """ Load the history and call the callback for every entry in the history. This one assumes the callback is only called from same thread as `Buffer` is using. @@ -103,13 +106,13 @@ class ThreadedHistory(History): """ def __init__( - self, history: History, event_loop: asyncio.BaseEventLoop = None + self, history: History, event_loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: """Create instance of ThreadedHistory Args: history (History): Instance of History intended to run on a background thread. - event_loop (asyncio.BaseEventLoop, optional): The event loop on which prompt toolkit is running. + event_loop (asyncio.AbstractEventLoop, optional): The event loop on which prompt toolkit is running. (Deprecated) Defaults to ``asyncio.get_event_loop(), which may *create* the event loop. Caller should provide an explicit value. """ self.history = history @@ -151,7 +154,10 @@ def call_all_callbacks(item: str) -> None: self._load_thread.daemon = True self._load_thread.start() - def bg_loader(self, item_loaded_callback: Callable[[str], None],) -> None: + def bg_loader( + self, + item_loaded_callback: Callable[[str], None], + ) -> None: """ Load the history and schedule the callback for every entry in the history. TODO: extend the callback so it can take a batch of lines in one event_loop dispatch.