Skip to content

Commit 110c51b

Browse files
Refactoring of history code.
- Fix race condition in ThreadedHistory. Fixes issue #1158. - Refactored the history code so that asynchronous history loader implementations become possible. - Start loading the history when the `BufferControl` renders for the first time. This way we are sure that the history loading uses the same event loop as the one used by the application. - Added unit tests. - Make it possible to wrap `InMemoryHistory` in `ThreadedHistory` (not useful, but good if all combinations work). - Use a deque for the working lines in buffer.py -> much better performance when reloading the history.
1 parent 298c3d9 commit 110c51b

File tree

4 files changed

+276
-70
lines changed

4 files changed

+276
-70
lines changed

prompt_toolkit/buffer.py

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,21 @@
33
It holds the text, cursor position, history, etc...
44
"""
55
import asyncio
6+
import logging
67
import os
78
import re
89
import shlex
910
import shutil
1011
import subprocess
1112
import tempfile
13+
from collections import deque
1214
from enum import Enum
1315
from functools import wraps
1416
from typing import (
1517
Any,
1618
Awaitable,
1719
Callable,
20+
Deque,
1821
Iterable,
1922
List,
2023
Optional,
@@ -54,6 +57,8 @@
5457
"reshape_text",
5558
]
5659

60+
logger = logging.getLogger(__name__)
61+
5762

5863
class EditReadOnlyBuffer(Exception):
5964
" Attempt editing of read-only :class:`.Buffer`. "
@@ -300,21 +305,12 @@ def __init__(
300305
self._async_completer = self._create_completer_coroutine()
301306
self._async_validator = self._create_auto_validate_coroutine()
302307

308+
# Asyncio task for populating the history.
309+
self._load_history_task: Optional[asyncio.Future[None]] = None
310+
303311
# Reset other attributes.
304312
self.reset(document=document)
305313

306-
# Load the history.
307-
def new_history_item(item: str) -> None:
308-
# XXX: Keep in mind that this function can be called in a different
309-
# thread!
310-
# Insert the new string into `_working_lines`.
311-
self._working_lines.insert(0, item)
312-
self.__working_index += (
313-
1 # Not entirely threadsafe, but probably good enough.
314-
)
315-
316-
self.history.load(new_history_item)
317-
318314
def __repr__(self) -> str:
319315
if len(self.text) < 15:
320316
text = self.text
@@ -373,14 +369,57 @@ def reset(
373369
self._undo_stack: List[Tuple[str, int]] = []
374370
self._redo_stack: List[Tuple[str, int]] = []
375371

372+
# Cancel history loader. If history loading was still ongoing.
373+
# Cancel the `_load_history_task`, so that next repaint of the
374+
# `BufferControl` we will repopulate it.
375+
if self._load_history_task is not None:
376+
self._load_history_task.cancel()
377+
self._load_history_task = None
378+
376379
#: The working lines. Similar to history, except that this can be
377380
#: modified. The user can press arrow_up and edit previous entries.
378381
#: Ctrl-C should reset this, and copy the whole history back in here.
379382
#: Enter should process the current command and append to the real
380383
#: history.
381-
self._working_lines = self.history.get_strings()[:]
382-
self._working_lines.append(document.text)
383-
self.__working_index = len(self._working_lines) - 1
384+
self._working_lines: Deque[str] = deque([document.text])
385+
self.__working_index = 0
386+
387+
def load_history_if_not_yet_loaded(self) -> None:
388+
"""
389+
Create task for populating the buffer history (if not yet done).
390+
391+
Note::
392+
393+
This needs to be called from within the event loop of the
394+
application, because history loading is async, and we need to be
395+
sure the right event loop is active. Therefor, we call this method
396+
in the `BufferControl.create_content`.
397+
398+
There are situations where prompt_toolkit applications are created
399+
in one thread, but will later run in a different thread (Ptpython
400+
is one example. The REPL runs in a separate thread, in order to
401+
prevent interfering with a potential different event loop in the
402+
main thread. The REPL UI however is still created in the main
403+
thread.) We could decide to not support creating prompt_toolkit
404+
objects in one thread and running the application in a different
405+
thread, but history loading is the only place where it matters, and
406+
this solves it.
407+
"""
408+
if self._load_history_task is None:
409+
410+
async def load_history() -> None:
411+
try:
412+
async for item in self.history.load():
413+
self._working_lines.appendleft(item)
414+
self.__working_index += 1
415+
except asyncio.CancelledError:
416+
pass
417+
except BaseException:
418+
# Log error if something goes wrong. (We don't have a
419+
# caller to which we can propagate this exception.)
420+
logger.exception("Loading history failed")
421+
422+
self._load_history_task = asyncio.ensure_future(load_history())
384423

385424
# <getters/setters>
386425

prompt_toolkit/history.py

Lines changed: 113 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
loading can be done asynchronously and making the history swappable would
88
probably break this.
99
"""
10+
import asyncio
1011
import datetime
1112
import os
13+
import threading
1214
from abc import ABCMeta, abstractmethod
13-
from threading import Thread
14-
from typing import Callable, Iterable, List, Optional
15+
from typing import AsyncGenerator, Iterable, List, Optional, Sequence
1516

1617
__all__ = [
1718
"History",
@@ -32,57 +33,43 @@ class History(metaclass=ABCMeta):
3233
def __init__(self) -> None:
3334
# In memory storage for strings.
3435
self._loaded = False
36+
37+
# History that's loaded already, in reverse order. Latest, most recent
38+
# item first.
3539
self._loaded_strings: List[str] = []
3640

3741
#
3842
# Methods expected by `Buffer`.
3943
#
4044

41-
def load(self, item_loaded_callback: Callable[[str], None]) -> None:
45+
async def load(self) -> AsyncGenerator[str, None]:
4246
"""
43-
Load the history and call the callback for every entry in the history.
44-
45-
XXX: The callback can be called from another thread, which happens in
46-
case of `ThreadedHistory`.
47-
48-
We can't assume that an asyncio event loop is running, and
49-
schedule the insertion into the `Buffer` using the event loop.
50-
51-
The reason is that the creation of the :class:`.History` object as
52-
well as the start of the loading happens *before*
53-
`Application.run()` is called, and it can continue even after
54-
`Application.run()` terminates. (Which is useful to have a
55-
complete history during the next prompt.)
56-
57-
Calling `get_event_loop()` right here is also not guaranteed to
58-
return the same event loop which is used in `Application.run`,
59-
because a new event loop can be created during the `run`. This is
60-
useful in Python REPLs, where we want to use one event loop for
61-
the prompt, and have another one active during the `eval` of the
62-
commands. (Otherwise, the user can schedule a while/true loop and
63-
freeze the UI.)
47+
Load the history and yield all the entries in reverse order (latest,
48+
most recent history entry first).
49+
50+
This method can be called multiple times from the `Buffer` to
51+
repopulate the history when prompting for a new input. So we are
52+
responsible here for both caching, and making sure that strings that
53+
were were appended to the history will be incorporated next time this
54+
method is called.
6455
"""
65-
if self._loaded:
66-
for item in self._loaded_strings[::-1]:
67-
item_loaded_callback(item)
68-
return
69-
70-
try:
71-
for item in self.load_history_strings():
72-
self._loaded_strings.insert(0, item)
73-
item_loaded_callback(item)
74-
finally:
56+
if not self._loaded:
57+
self._loaded_strings = list(self.load_history_strings())
7558
self._loaded = True
7659

60+
for item in self._loaded_strings:
61+
yield item
62+
7763
def get_strings(self) -> List[str]:
7864
"""
7965
Get the strings from the history that are loaded so far.
66+
(In order. Oldest item first.)
8067
"""
81-
return self._loaded_strings
68+
return self._loaded_strings[::-1]
8269

8370
def append_string(self, string: str) -> None:
8471
" Add string to the history. "
85-
self._loaded_strings.append(string)
72+
self._loaded_strings.insert(0, string)
8673
self.store_string(string)
8774

8875
#
@@ -110,40 +97,99 @@ def store_string(self, string: str) -> None:
11097

11198
class ThreadedHistory(History):
11299
"""
113-
Wrapper that runs the `load_history_strings` generator in a thread.
100+
Wrapper around `History` implementations that run the `load()` generator in
101+
a thread.
114102
115103
Use this to increase the start-up time of prompt_toolkit applications.
116104
History entries are available as soon as they are loaded. We don't have to
117105
wait for everything to be loaded.
118106
"""
119107

120108
def __init__(self, history: History) -> None:
121-
self.history = history
122-
self._load_thread: Optional[Thread] = None
123-
self._item_loaded_callbacks: List[Callable[[str], None]] = []
124109
super().__init__()
125110

126-
def load(self, item_loaded_callback: Callable[[str], None]) -> None:
127-
self._item_loaded_callbacks.append(item_loaded_callback)
111+
self.history = history
128112

129-
# Start the load thread, if we don't have a thread yet.
130-
if not self._load_thread:
113+
self._load_thread: Optional[threading.Thread] = None
131114

132-
def call_all_callbacks(item: str) -> None:
133-
for cb in self._item_loaded_callbacks:
134-
cb(item)
115+
# Lock for accessing/manipulating `_loaded_strings` and `_loaded`
116+
# together in a consistent state.
117+
self._lock = threading.Lock()
135118

136-
self._load_thread = Thread(
137-
target=self.history.load, args=(call_all_callbacks,)
119+
# Events created by each `load()` call. Used to wait for new history
120+
# entries from the loader thread.
121+
self._string_load_events: List[threading.Event] = []
122+
123+
async def load(self) -> AsyncGenerator[str, None]:
124+
"""
125+
Like `History.load(), but call `self.load_history_strings()` in a
126+
background thread.
127+
"""
128+
# Start the load thread, if this is called for the first time.
129+
if not self._load_thread:
130+
self._load_thread = threading.Thread(
131+
target=self._in_load_thread,
132+
daemon=True,
138133
)
139-
self._load_thread.daemon = True
140134
self._load_thread.start()
141135

142-
def get_strings(self) -> List[str]:
143-
return self.history.get_strings()
136+
# Consume the `_loaded_strings` list, using asyncio.
137+
loop = asyncio.get_event_loop()
138+
139+
# Create threading Event so that we can wait for new items.
140+
event = threading.Event()
141+
event.set()
142+
self._string_load_events.append(event)
143+
144+
items_yielded = 0
145+
146+
try:
147+
while True:
148+
# Wait for new items to be available.
149+
await loop.run_in_executor(None, event.wait)
150+
151+
# Read new items (in lock).
152+
await loop.run_in_executor(None, self._lock.acquire)
153+
try:
154+
new_items = self._loaded_strings[items_yielded:]
155+
done = self._loaded
156+
event.clear()
157+
finally:
158+
self._lock.release()
159+
160+
items_yielded += len(new_items)
161+
162+
for item in new_items:
163+
yield item
164+
165+
if done:
166+
break
167+
finally:
168+
self._string_load_events.remove(event)
169+
170+
def _in_load_thread(self) -> None:
171+
try:
172+
# Start with an empty list. In case `append_string()` was called
173+
# before `load()` happened. Then `.store_string()` will have
174+
# written these entries back to disk and we will reload it.
175+
self._loaded_strings = []
176+
177+
for item in self.history.load_history_strings():
178+
with self._lock:
179+
self._loaded_strings.append(item)
180+
181+
for event in self._string_load_events:
182+
event.set()
183+
finally:
184+
with self._lock:
185+
self._loaded = True
186+
for event in self._string_load_events:
187+
event.set()
144188

145189
def append_string(self, string: str) -> None:
146-
self.history.append_string(string)
190+
with self._lock:
191+
self._loaded_strings.insert(0, string)
192+
self.store_string(string)
147193

148194
# All of the following are proxied to `self.history`.
149195

@@ -160,13 +206,25 @@ def __repr__(self) -> str:
160206
class InMemoryHistory(History):
161207
"""
162208
:class:`.History` class that keeps a list of all strings in memory.
209+
210+
In order to prepopulate the history, it's possible to call either
211+
`append_string` for all items or pass a list of strings to `__init__` here.
163212
"""
164213

214+
def __init__(self, history_strings: Optional[Sequence[str]] = None) -> None:
215+
super().__init__()
216+
# Emulating disk storage.
217+
if history_strings is None:
218+
self._storage = []
219+
else:
220+
self._storage = list(history_strings)
221+
165222
def load_history_strings(self) -> Iterable[str]:
166-
return []
223+
for item in self._storage[::-1]:
224+
yield item
167225

168226
def store_string(self, string: str) -> None:
169-
pass
227+
self._storage.append(string)
170228

171229

172230
class DummyHistory(History):

prompt_toolkit/layout/controls.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,14 @@ def create_content(
737737
"""
738738
buffer = self.buffer
739739

740+
# Trigger history loading of the buffer. We do this during the
741+
# rendering of the UI here, because it needs to happen when an
742+
# `Application` with its event loop is running. During the rendering of
743+
# the buffer control is the earliest place we can achieve this, where
744+
# we're sure the right event loop is active, and don't require user
745+
# interaction (like in a key binding).
746+
buffer.load_history_if_not_yet_loaded()
747+
740748
# Get the document to be shown. If we are currently searching (the
741749
# search buffer has focus, and the preview_search filter is enabled),
742750
# then use the search document, which has possibly a different

0 commit comments

Comments
 (0)