Skip to content

ThreadedHistory: invoke history load callback on event loop #1170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ docs/_build

# pycharm metadata
.idea

# VS Code
.vscode/
100 changes: 100 additions & 0 deletions examples/prompts/history/bug_thread_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python
"""
Demonstrate bug in threaded history, where asynchronous loading can corrupt Buffer context.

Seems to happen with very large history being loaded and causing slowdowns.

"""
import re
import time

from prompt_toolkit import PromptSession
from prompt_toolkit.history import History, ThreadedHistory


class MegaHistory(History):
"""
Example class that loads lots of history

Sample designed to exercise existing multitasking hazards,
be careful to not add any more in the sample.
"""

def __init__(self, init_request: int = 1000, *args, **kwargs):
super(MegaHistory, self).__init__(*args, **kwargs)
self.requested_count = 0 # only modified by main (requesting) thread
self.requested_commands = 0 # only modified by main (requesting) thread
self.actual_count = 0 # only modified by background thread

def load_history_strings(self):
while True:
while self.requested_count <= self.actual_count:
time.sleep(0.001) # don't busy loop

print(
f"... starting to load {self.requested_count - self.actual_count:15,d} more items."
)
while self.requested_count > self.actual_count:
yield f"History item {self.actual_count:15,d}, command number {self.requested_commands}"
self.actual_count += 1
print("...done.")

def store_string(self, string):
pass # Don't store strings.

# called by main thread, watch out for multitasking hazards.
def add_request(self, requested: int = 0):
self.requested_count += requested
self.requested_commands += 1

def show_request(self):
print(
f"Have loaded {self.actual_count:15,d} of {self.requested_count:15,d} in {self.requested_commands} commands."
)


HIST_CMD = re.compile(r"^hist (load (\d+)|show)$", re.IGNORECASE)


def main():
print(
"Asynchronous loading of history. Designed to exercise multitasking hazard in Buffer.\n"
"When started, tries to load 100,000 lines into history with no delay.\n"
"Expect to trigger assertion in document.py line 98, though others may fire.\n"
"\n"
"Can request more lines by `hist load nnnnn`, and can see progress by `hist show`.\n"
"\n"
"Notice that the up-arrow will work for the completions that are currently loaded.\n"
"Even after the input is accepted, loading will continue in the background and\n"
"the additional history will be available when the next prompt is displayed.\n"
)
mh = MegaHistory()
our_history = ThreadedHistory(mh)

# The history needs to be passed to the `PromptSession`. It can't be passed
# to the `prompt` call because only one history can be used during a
# session.
session = PromptSession(history=our_history)

mh.add_request(99999)

while True:
text = session.prompt("Say something: ")
if text.startswith("hist"):
m = HIST_CMD.match(text)
if not m:
print("eh?")
else:
if m[1] == "show":
mh.show_request()
elif m[1].startswith("load"):
mh.add_request(int(m[2]))
else:
print("eh? hist load nnnnnn\nor hist show")
pass
else:
print("You said: %s" % text)


if __name__ == "__main__":
main()
14 changes: 13 additions & 1 deletion examples/prompts/history/slow-history.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
By wrapping it in `ThreadedHistory`, the history will load in the background
without blocking any user interaction.
"""
import asyncio
import time

from prompt_toolkit import PromptSession
Expand Down Expand Up @@ -32,11 +33,22 @@ def main():
"Even when the input is accepted, loading will continue in the "
"background and when the next prompt is displayed.\n"
)
our_history = ThreadedHistory(SlowHistory())

my_loop = asyncio.get_event_loop() # creates loop if needed

# Inform ThreadedHistory which event loop to use
# when passing lines of history to the prompt.

our_history = ThreadedHistory(SlowHistory(), my_loop)

# The history needs to be passed to the `PromptSession`. It can't be passed
# to the `prompt` call because only one history can be used during a
# session.
# Note that PromptSession runs on the thread's current event loop because
# it was created above and is therefore in synch with ThreadedHistory.
# PromptSession would create and event loop if it didn't find one
# already running, but then ThreadedHistory would not work.

session = PromptSession(history=our_history)

while True:
Expand Down
15 changes: 10 additions & 5 deletions prompt_toolkit/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import shutil
import subprocess
import tempfile
import threading
from enum import Enum
from functools import wraps
from typing import (
Expand Down Expand Up @@ -305,13 +306,13 @@ def __init__(

# Load the history.
def new_history_item(item: str) -> None:
# XXX: Keep in mind that this function can be called in a different
# thread!
# Insert the new string into `_working_lines`.
# XXX: This function contains a critical section, may only
# be invoked on the event loop thread if history is
# loading on another thread.

self._working_lines.insert(0, item)
self.__working_index += (
1 # Not entirely threadsafe, but probably good enough.
)
self.__working_index += 1

self.history.load(new_history_item)

Expand Down Expand Up @@ -413,6 +414,10 @@ def _set_cursor_position(self, value: int) -> bool:

@property
def text(self) -> str:
if self.working_index >= len(self._working_lines):
print(
f"Buffer: working_index {self.working_index} out of sync with working_lines {len(self._working_lines)}"
)
return self._working_lines[self.working_index]

@text.setter
Expand Down
96 changes: 63 additions & 33 deletions prompt_toolkit/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
when a history entry is loaded. This loading can be done asynchronously
and making the history swappable would probably break this.
"""
import asyncio
import datetime
import os
import time
import warnings
from abc import ABCMeta, abstractmethod
from threading import Thread
from typing import Callable, Iterable, List, Optional
Expand Down Expand Up @@ -37,29 +40,15 @@ def __init__(self) -> None:
# Methods expected by `Buffer`.
#

def load(self, item_loaded_callback: Callable[[str], None]) -> None:
def load(
self,
item_loaded_callback: Callable[[str], None],
) -> None:
"""
Load the history and call the callback for every entry in the history.
This one assumes the callback is only called from same thread as `Buffer` is using.

XXX: The callback can be called from another thread, which happens in
case of `ThreadedHistory`.

We can't assume that an asyncio event loop is running, and
schedule the insertion into the `Buffer` using the event loop.

The reason is that the creation of the :class:`.History` object as
well as the start of the loading happens *before*
`Application.run()` is called, and it can continue even after
`Application.run()` terminates. (Which is useful to have a
complete history during the next prompt.)

Calling `get_event_loop()` right here is also not guaranteed to
return the same event loop which is used in `Application.run`,
because a new event loop can be created during the `run`. This is
useful in Python REPLs, where we want to use one event loop for
the prompt, and have another one active during the `eval` of the
commands. (Otherwise, the user can schedule a while/true loop and
freeze the UI.)
See `ThreadedHistory` for another way.
"""
if self._loaded:
for item in self._loaded_strings[::-1]:
Expand Down Expand Up @@ -116,33 +105,77 @@ class ThreadedHistory(History):
wait for everything to be loaded.
"""

def __init__(self, history: History) -> None:
def __init__(
self, history: History, event_loop: Optional[asyncio.AbstractEventLoop] = None
) -> None:
"""Create instance of ThreadedHistory

Args:
history (History): Instance of History intended to run on a background thread.
event_loop (asyncio.AbstractEventLoop, optional): The event loop on which prompt toolkit is running.
(Deprecated) Defaults to ``asyncio.get_event_loop(), which may *create* the event loop. Caller should provide an explicit value.
"""
self.history = history
self._load_thread: Optional[Thread] = None
self._item_loaded_callbacks: List[Callable[[str], None]] = []
if event_loop is None:
warnings.warn(
"Event_loop argument should be explicitly provided by caller so history callback "
"uses the same loop as rest of prompt-toolkit. Will use default event loop for now.",
DeprecationWarning,
)
event_loop = asyncio.get_event_loop()
self.event_loop = event_loop
super().__init__()

def load(self, item_loaded_callback: Callable[[str], None]) -> None:

"""Collect the history strings on a background thread,
but run the callback which provides them to a buffer in the event loop.
"""

self._item_loaded_callbacks.append(item_loaded_callback)

def call_all_callbacks(item: str) -> None:
for cb in self._item_loaded_callbacks:
cb(item)

if self._loaded: # ugly reference to base class internal...
for item in self._loaded_strings[::-1]:
call_all_callbacks(item)
return

# Start the load thread, if we don't have a thread yet.
if not self._load_thread:

def call_all_callbacks(item: str) -> None:
for cb in self._item_loaded_callbacks:
cb(item)

self._load_thread = Thread(
target=self.history.load, args=(call_all_callbacks,)
target=self.bg_loader, args=(call_all_callbacks,)
)
self._load_thread.daemon = True
self._load_thread.start()

def get_strings(self) -> List[str]:
return self.history.get_strings()
def bg_loader(
self,
item_loaded_callback: Callable[[str], None],
) -> None:
"""
Load the history and schedule the callback for every entry in the history.
TODO: extend the callback so it can take a batch of lines in one event_loop dispatch.
"""

def append_string(self, string: str) -> None:
self.history.append_string(string)
try:
for item in self.load_history_strings():
self._loaded_strings.insert(
0, item
) # slowest way to add an element to a list known to man.
self.event_loop.call_soon_threadsafe(
item_loaded_callback, item
) # expensive way to dispatch single line.
finally:
self._loaded = True

def __repr__(self) -> str:
return "ThreadedHistory(%r)" % (self.history,)

# All of the following are proxied to `self.history`.

Expand All @@ -152,9 +185,6 @@ def load_history_strings(self) -> Iterable[str]:
def store_string(self, string: str) -> None:
self.history.store_string(string)

def __repr__(self) -> str:
return "ThreadedHistory(%r)" % (self.history,)


class InMemoryHistory(History):
"""
Expand Down