Skip to content

ThreadedHistory: invoke Buffer.new_history_item() on event loop from background thread #1159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ docs/_build

# pycharm metadata
.idea

# VS Code
.vscode/
96 changes: 96 additions & 0 deletions examples/prompts/history/bug_thread_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python
"""
Demonstrate bug in threaded history, where asynchronous loading can corrupt Buffer context.

Seems to happen with very large history being loaded and causing slowdowns.

"""
import re
import time

from prompt_toolkit import PromptSession
from prompt_toolkit.history import History, ThreadedHistory


class MegaHistory(History):
"""
Example class that loads lots of history

Sample designed to exercise existing multitasking hazards, don't add any more.
"""

def __init__(self, init_request: int = 1000, *args, **kwargs):
super(MegaHistory, self).__init__(*args, **kwargs)
self.requested_count = 0 # only modified by main (requesting) thread
self.requested_commands = 0 # only modified by main (requesting) thread
self.actual_count = 0 # only modified by background thread

def load_history_strings(self):
while True:
while self.requested_count <= self.actual_count:
time.sleep(0.001) # don't busy loop

print(
f"... starting to load {self.requested_count - self.actual_count:15,d} more items."
)
while self.requested_count > self.actual_count:
yield f"History item {self.actual_count:15,d}, command number {self.requested_commands}"
self.actual_count += 1
print("...done.")

def store_string(self, string):
pass # Don't store strings.

# called by main thread, watch out for multitasking hazards.
def add_request(self, requested: int = 0):
self.requested_count += requested
self.requested_commands += 1

def show_request(self):
print(
f"Have loaded {self.actual_count:15,d} of {self.requested_count:15,d} in {self.requested_commands} commands."
)


HIST_CMD = re.compile(r"^hist (load (\d+)|show)$", re.IGNORECASE)


def main():
print(
"Asynchronous loading of history. Designed to exercise multitasking hazard in Buffer.\n"
"When started, tries to load 100,000 lines into history with no delay.\n"
"Expect to trigger assertion in document.py line 98, though others may fire.\n"
"\n"
"Can request more lines by `hist load nnnnn`, and can see progress by `hist show`.\n"
"\n"
)
mh = MegaHistory()
our_history = ThreadedHistory(mh)

# The history needs to be passed to the `PromptSession`. It can't be passed
# to the `prompt` call because only one history can be used during a
# session.
session = PromptSession(history=our_history)

mh.add_request(99999)

while True:
text = session.prompt("Say something: ")
if text.startswith("hist"):
m = HIST_CMD.match(text)
if not m:
print("eh?")
else:
if m[1] == "show":
mh.show_request()
elif m[1].startswith("load"):
mh.add_request(int(m[2]))
else:
print("eh? hist load nnnnnn\nor hist show")
pass
else:
print("You said: %s" % text)


if __name__ == "__main__":
main()
15 changes: 10 additions & 5 deletions prompt_toolkit/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import shutil
import subprocess
import tempfile
import threading
from enum import Enum
from functools import wraps
from typing import (
Expand Down Expand Up @@ -305,13 +306,13 @@ def __init__(

# Load the history.
def new_history_item(item: str) -> None:
# XXX: Keep in mind that this function can be called in a different
# thread!
# Insert the new string into `_working_lines`.
# XXX: This function contains a critical section, may only
# be invoked on the event loop thread if history is
# loading on another thread.

self._working_lines.insert(0, item)
self.__working_index += (
1 # Not entirely threadsafe, but probably good enough.
)
self.__working_index += 1

self.history.load(new_history_item)

Expand Down Expand Up @@ -413,6 +414,10 @@ def _set_cursor_position(self, value: int) -> bool:

@property
def text(self) -> str:
if self.working_index >= len(self._working_lines):
print(
f"Buffer: working_index {self.working_index} out of sync with working_lines {len(self._working_lines)}"
)
return self._working_lines[self.working_index]

@text.setter
Expand Down
77 changes: 46 additions & 31 deletions prompt_toolkit/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
when a history entry is loaded. This loading can be done asynchronously
and making the history swappable would probably break this.
"""
import asyncio
import datetime
import os
import time
from abc import ABCMeta, abstractmethod
from threading import Thread
from typing import Callable, Iterable, List, Optional
Expand Down Expand Up @@ -37,29 +39,12 @@ def __init__(self) -> None:
# Methods expected by `Buffer`.
#

def load(self, item_loaded_callback: Callable[[str], None]) -> None:
def load(self, item_loaded_callback: Callable[[str], None],) -> None:
"""
Load the history and call the callback for every entry in the history.
This one assumes the callback is only called from same thread as `Buffer` is using.

XXX: The callback can be called from another thread, which happens in
case of `ThreadedHistory`.

We can't assume that an asyncio event loop is running, and
schedule the insertion into the `Buffer` using the event loop.

The reason is that the creation of the :class:`.History` object as
well as the start of the loading happens *before*
`Application.run()` is called, and it can continue even after
`Application.run()` terminates. (Which is useful to have a
complete history during the next prompt.)

Calling `get_event_loop()` right here is also not guaranteed to
return the same event loop which is used in `Application.run`,
because a new event loop can be created during the `run`. This is
useful in Python REPLs, where we want to use one event loop for
the prompt, and have another one active during the `eval` of the
commands. (Otherwise, the user can schedule a while/true loop and
freeze the UI.)
See `ThreadedHistory` for another way.
"""
if self._loaded:
for item in self._loaded_strings[::-1]:
Expand Down Expand Up @@ -123,26 +108,59 @@ def __init__(self, history: History) -> None:
super().__init__()

def load(self, item_loaded_callback: Callable[[str], None]) -> None:

"""Collect the history strings on a background thread,
but run the callback in the event loop.

Caller of ThreadedHistory must ensure that the Application ends up running on the same
event loop as we (probably) create here.
"""

self._item_loaded_callbacks.append(item_loaded_callback)

def call_all_callbacks(item: str) -> None:
for cb in self._item_loaded_callbacks:
cb(item)

if self._loaded: # ugly reference to base class internal...
for item in self._loaded_strings[::-1]:
call_all_callbacks(item)
return

# Start the load thread, if we don't have a thread yet.
if not self._load_thread:

def call_all_callbacks(item: str) -> None:
for cb in self._item_loaded_callbacks:
cb(item)
event_loop = asyncio.get_event_loop()

self._load_thread = Thread(
target=self.history.load, args=(call_all_callbacks,)
target=self.bg_loader, args=(call_all_callbacks, event_loop)
)
self._load_thread.daemon = True
self._load_thread.start()

def get_strings(self) -> List[str]:
return self.history.get_strings()
def bg_loader(
self,
item_loaded_callback: Callable[[str], None],
event_loop: asyncio.BaseEventLoop,
) -> None:
"""
Load the history and schedule the callback for every entry in the history.
TODO: extend the callback so it can take a batch of lines in one event_loop dispatch.
"""

def append_string(self, string: str) -> None:
self.history.append_string(string)
try:
for item in self.load_history_strings():
self._loaded_strings.insert(
0, item
) # slowest way to add an element to a list known to man.
event_loop.call_soon_threadsafe(
item_loaded_callback, item
) # expensive way to dispatch single line.
finally:
self._loaded = True

def __repr__(self) -> str:
return "ThreadedHistory(%r)" % (self.history,)

# All of the following are proxied to `self.history`.

Expand All @@ -152,9 +170,6 @@ def load_history_strings(self) -> Iterable[str]:
def store_string(self, string: str) -> None:
self.history.store_string(string)

def __repr__(self) -> str:
return "ThreadedHistory(%r)" % (self.history,)


class InMemoryHistory(History):
"""
Expand Down