Skip to content

Commit da38089

Browse files
committed
async kv store
1 parent 63a5e03 commit da38089

File tree

8 files changed

+353
-137
lines changed

8 files changed

+353
-137
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use lightning::sign::ChangeDestinationSource;
3838
#[cfg(feature = "std")]
3939
use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::OutputSpender;
41+
use lightning::util::async_poll::FutureSpawner;
4142
use lightning::util::logger::Logger;
4243
use lightning::util::persist::{KVStore, Persister};
4344
use lightning::util::sweep::OutputSweeper;
@@ -780,8 +781,9 @@ pub async fn process_events_async<
780781
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
781782
EventHandler: Fn(Event) -> EventHandlerFuture,
782783
PS: 'static + Deref + Send,
784+
FS: FutureSpawner,
783785
M: 'static
784-
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
786+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, FS>>
785787
+ Send
786788
+ Sync,
787789
CM: 'static + Deref,
@@ -977,7 +979,7 @@ impl BackgroundProcessor {
977979
EH: 'static + EventHandler + Send,
978980
PS: 'static + Deref + Send,
979981
M: 'static
980-
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
982+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, FS>>
981983
+ Send
982984
+ Sync,
983985
CM: 'static + Deref + Send,
@@ -992,6 +994,7 @@ impl BackgroundProcessor {
992994
O: 'static + Deref,
993995
K: 'static + Deref,
994996
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
997+
FS: FutureSpawner
995998
>(
996999
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
9971000
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,

lightning-persister/src/fs_store.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
33

44
use lightning::util::persist::{KVStore, MigratableKVStore};
55
use lightning::util::string::PrintableString;
6+
use lightning::util::async_poll::AsyncResult;
67

78
use std::collections::HashMap;
89
use std::fs;
@@ -11,6 +12,8 @@ use std::path::{Path, PathBuf};
1112
use std::sync::atomic::{AtomicUsize, Ordering};
1213
use std::sync::{Arc, Mutex, RwLock};
1314

15+
16+
1417
#[cfg(target_os = "windows")]
1518
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
1619

@@ -329,6 +332,12 @@ impl KVStore for FilesystemStore {
329332

330333
Ok(keys)
331334
}
335+
336+
fn write_async(
337+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
338+
) -> AsyncResult<'static, ()> {
339+
todo!()
340+
}
332341
}
333342

334343
fn dir_entry_is_key(p: &Path) -> Result<bool, lightning::io::Error> {

lightning/src/chain/chainmonitor.rs

Lines changed: 125 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ use crate::chain::transaction::{OutPoint, TransactionData};
3434
use crate::ln::types::ChannelId;
3535
use crate::sign::ecdsa::EcdsaChannelSigner;
3636
use crate::events::{self, Event, EventHandler, ReplayEvent};
37+
use crate::util::async_poll::{poll_or_spawn, AsyncResult, FutureSpawner};
3738
use crate::util::logger::{Logger, WithContext};
3839
use crate::util::errors::APIError;
3940
use crate::util::persist::MonitorName;
4041
use crate::util::wakers::{Future, Notifier};
4142
use crate::ln::channel_state::ChannelDetails;
43+
use crate::sync::{Arc};
4244

4345
use crate::prelude::*;
4446
use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
@@ -120,7 +122,7 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
120122
///
121123
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
122124
/// [`Writeable::write`]: crate::util::ser::Writeable::write
123-
fn persist_new_channel(&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>) -> ChannelMonitorUpdateStatus;
125+
fn persist_new_channel(&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>) -> AsyncResult<'static, ()>;
124126

125127
/// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
126128
/// update.
@@ -159,7 +161,7 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
159161
/// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
160162
///
161163
/// [`Writeable::write`]: crate::util::ser::Writeable::write
162-
fn update_persisted_channel(&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>) -> ChannelMonitorUpdateStatus;
164+
fn update_persisted_channel(&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>) -> AsyncResult<'static, ()>;
163165
/// Prevents the channel monitor from being loaded on startup.
164166
///
165167
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
@@ -231,31 +233,33 @@ impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, Chann
231233
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
232234
/// [module-level documentation]: crate::chain::chainmonitor
233235
/// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims
234-
pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
236+
pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, FS: FutureSpawner>
235237
where C::Target: chain::Filter,
236238
T::Target: BroadcasterInterface,
237239
F::Target: FeeEstimator,
238240
L::Target: Logger,
239241
P::Target: Persist<ChannelSigner>,
240242
{
241-
monitors: RwLock<HashMap<ChannelId, MonitorHolder<ChannelSigner>>>,
243+
monitors: Arc<RwLock<HashMap<ChannelId, MonitorHolder<ChannelSigner>>>>,
242244
chain_source: Option<C>,
243245
broadcaster: T,
244246
logger: L,
245247
fee_estimator: F,
246248
persister: P,
247249
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
248250
/// from the user and not from a [`ChannelMonitor`].
249-
pending_monitor_events: Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>>,
251+
pending_monitor_events: Arc<Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>>>,
250252
/// The best block height seen, used as a proxy for the passage of time.
251253
highest_chain_height: AtomicUsize,
252254

253255
/// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
254256
/// it to give to users (or [`MonitorEvent`]s for `ChannelManager` to process).
255-
event_notifier: Notifier,
257+
event_notifier: Arc<Notifier>,
258+
259+
future_spawner: Arc<FS>,
256260
}
257261

258-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
262+
impl<ChannelSigner: EcdsaChannelSigner + 'static, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, FS: FutureSpawner> ChainMonitor<ChannelSigner, C, T, F, L, P, FS>
259263
where C::Target: chain::Filter,
260264
T::Target: BroadcasterInterface,
261265
F::Target: FeeEstimator,
@@ -345,18 +349,31 @@ where C::Target: chain::Filter,
345349
// `ChannelMonitorUpdate` after a channel persist for a channel with the same
346350
// `latest_update_id`.
347351
let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
348-
match self.persister.update_persisted_channel(monitor.persistence_key(), None, monitor) {
349-
ChannelMonitorUpdateStatus::Completed =>
350-
log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data",
351-
log_funding_info!(monitor)
352-
),
353-
ChannelMonitorUpdateStatus::InProgress => {
354-
log_trace!(logger, "Channel Monitor sync for channel {} in progress.", log_funding_info!(monitor));
355-
}
356-
ChannelMonitorUpdateStatus::UnrecoverableError => {
357-
return Err(());
352+
let max_update_id = _pending_monitor_updates.iter().copied().max().unwrap_or(0);
353+
354+
let persist_res = self.persister.update_persisted_channel(monitor.persistence_key(), None, monitor);
355+
356+
let monitors = self.monitors.clone();
357+
let pending_monitor_updates_cb = self.pending_monitor_events.clone();
358+
let event_notifier = self.event_notifier.clone();
359+
let future_spawner = self.future_spawner.clone();
360+
let channel_id = *channel_id;
361+
362+
match poll_or_spawn(persist_res, move || {
363+
// TODO: Log error if the monitor is not persisted.
364+
let _ = ChainMonitor::<ChannelSigner, C, T, F, L, P, FS>::channel_monitor_updated_internal(&monitors, &pending_monitor_updates_cb, &event_notifier,
365+
channel_id, max_update_id);
366+
}, future_spawner.deref()) {
367+
Ok(true) => {
368+
// log
369+
},
370+
Ok(false) => {
371+
// log
372+
}
373+
Err(_) => {
374+
return Err(());
375+
},
358376
}
359-
}
360377
}
361378

362379
// Register any new outputs with the chain source for filtering, storing any dependent
@@ -386,17 +403,18 @@ where C::Target: chain::Filter,
386403
/// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may
387404
/// always need to fetch full blocks absent another means for determining which blocks contain
388405
/// transactions relevant to the watched channels.
389-
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
406+
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P, future_spawner: FS) -> Self {
390407
Self {
391-
monitors: RwLock::new(new_hash_map()),
408+
monitors: Arc::new(RwLock::new(new_hash_map())),
392409
chain_source,
393410
broadcaster,
394411
logger,
395412
fee_estimator: feeest,
396413
persister,
397-
pending_monitor_events: Mutex::new(Vec::new()),
414+
pending_monitor_events: Arc::new(Mutex::new(Vec::new())),
398415
highest_chain_height: AtomicUsize::new(0),
399-
event_notifier: Notifier::new(),
416+
event_notifier: Arc::new(Notifier::new()),
417+
future_spawner: Arc::new(future_spawner),
400418
}
401419
}
402420

@@ -529,6 +547,40 @@ where C::Target: chain::Filter,
529547
Ok(())
530548
}
531549

550+
fn channel_monitor_updated_internal(
551+
monitors: &RwLock<HashMap<ChannelId, MonitorHolder<ChannelSigner>>>,
552+
pending_monitor_events: &Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>>,
553+
event_notifier: &Notifier,
554+
channel_id: ChannelId, completed_update_id: u64) -> Result<(), APIError> {
555+
let monitors = monitors.read().unwrap();
556+
let monitor_data = if let Some(mon) = monitors.get(&channel_id) { mon } else {
557+
return Err(APIError::APIMisuseError { err: format!("No ChannelMonitor matching channel ID {} found", channel_id) });
558+
};
559+
let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap();
560+
pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
561+
562+
// Note that we only check for pending non-chainsync monitor updates and we don't track monitor
563+
// updates resulting from chainsync in `pending_monitor_updates`.
564+
let monitor_is_pending_updates = monitor_data.has_pending_updates(&pending_monitor_updates);
565+
566+
// TODO: Add logger
567+
568+
if monitor_is_pending_updates {
569+
// If there are still monitor updates pending, we cannot yet construct a
570+
// Completed event.
571+
return Ok(());
572+
}
573+
let funding_txo = monitor_data.monitor.get_funding_txo();
574+
pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed {
575+
funding_txo,
576+
channel_id,
577+
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
578+
}], monitor_data.monitor.get_counterparty_node_id()));
579+
580+
event_notifier.notify();
581+
Ok(())
582+
}
583+
532584
/// This wrapper avoids having to update some of our tests for now as they assume the direct
533585
/// chain::Watch API wherein we mark a monitor fully-updated by just calling
534586
/// channel_monitor_updated once with the highest ID.
@@ -667,8 +719,8 @@ where C::Target: chain::Filter,
667719
}
668720
}
669721

670-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
671-
chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P>
722+
impl<ChannelSigner: EcdsaChannelSigner + Send + Sync + 'static, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, FS: FutureSpawner>
723+
chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P, FS>
672724
where
673725
C::Target: chain::Filter,
674726
T::Target: BroadcasterInterface,
@@ -696,8 +748,8 @@ where
696748
}
697749
}
698750

699-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
700-
chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P>
751+
impl<ChannelSigner: EcdsaChannelSigner + Sync + Send + 'static, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, FS: FutureSpawner>
752+
chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P, FS>
701753
where
702754
C::Target: chain::Filter,
703755
T::Target: BroadcasterInterface,
@@ -750,8 +802,8 @@ where
750802
}
751803
}
752804

753-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref >
754-
chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
805+
impl<ChannelSigner: EcdsaChannelSigner + Sync + Send + 'static, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref, FS: FutureSpawner + Clone>
806+
chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P, FS>
755807
where C::Target: chain::Filter,
756808
T::Target: BroadcasterInterface,
757809
F::Target: FeeEstimator,
@@ -772,15 +824,28 @@ where C::Target: chain::Filter,
772824
let update_id = monitor.get_latest_update_id();
773825
let mut pending_monitor_updates = Vec::new();
774826
let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
775-
match persist_res {
776-
ChannelMonitorUpdateStatus::InProgress => {
777-
log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
778-
pending_monitor_updates.push(update_id);
779-
},
780-
ChannelMonitorUpdateStatus::Completed => {
827+
828+
let update_status;
829+
let monitors = self.monitors.clone();
830+
let pending_monitor_updates_cb = self.pending_monitor_events.clone();
831+
let event_notifier = self.event_notifier.clone();
832+
let future_spawner = self.future_spawner.clone();
833+
834+
match poll_or_spawn(persist_res, move || {
835+
// TODO: Log error if the monitor is not persisted.
836+
let _ = ChainMonitor::<ChannelSigner, C, T, F, L, P, FS>::channel_monitor_updated_internal(&monitors, &pending_monitor_updates_cb, &event_notifier,
837+
channel_id, update_id);
838+
}, future_spawner.deref()) {
839+
Ok(true) => {
781840
log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
841+
update_status = ChannelMonitorUpdateStatus::Completed;
782842
},
783-
ChannelMonitorUpdateStatus::UnrecoverableError => {
843+
Ok(false) => {
844+
log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
845+
pending_monitor_updates.push(update_id);
846+
update_status = ChannelMonitorUpdateStatus::InProgress;
847+
}
848+
Err(_) => {
784849
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
785850
log_error!(logger, "{}", err_str);
786851
panic!("{}", err_str);
@@ -793,7 +858,7 @@ where C::Target: chain::Filter,
793858
monitor,
794859
pending_monitor_updates: Mutex::new(pending_monitor_updates),
795860
});
796-
Ok(persist_res)
861+
Ok(update_status)
797862
}
798863

799864
fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
@@ -838,27 +903,40 @@ where C::Target: chain::Filter,
838903
} else {
839904
self.persister.update_persisted_channel(monitor.persistence_key(), Some(update), monitor)
840905
};
841-
match persist_res {
842-
ChannelMonitorUpdateStatus::InProgress => {
843-
pending_monitor_updates.push(update_id);
906+
907+
let monitors = self.monitors.clone();
908+
let pending_monitor_updates_cb = self.pending_monitor_events.clone();
909+
let event_notifier = self.event_notifier.clone();
910+
let future_spawner = self.future_spawner.clone();
911+
912+
let update_status;
913+
match poll_or_spawn(persist_res, move || {
914+
// TODO: Log error if the monitor is not persisted.
915+
let _ = ChainMonitor::<ChannelSigner, C, T, F, L, P, FS>::channel_monitor_updated_internal(&monitors, &pending_monitor_updates_cb, &event_notifier,
916+
channel_id, update_id);
917+
}, future_spawner.deref()) {
918+
Ok(true) => {
844919
log_debug!(logger,
845-
"Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress",
920+
"Persistence of ChannelMonitorUpdate id {:?} for channel {} completed",
846921
update_id,
847922
log_funding_info!(monitor)
848923
);
924+
update_status = ChannelMonitorUpdateStatus::Completed;
849925
},
850-
ChannelMonitorUpdateStatus::Completed => {
926+
Ok(false) => {
851927
log_debug!(logger,
852-
"Persistence of ChannelMonitorUpdate id {:?} for channel {} completed",
928+
"Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress",
853929
update_id,
854930
log_funding_info!(monitor)
855931
);
856-
},
857-
ChannelMonitorUpdateStatus::UnrecoverableError => {
932+
pending_monitor_updates.push(update_id);
933+
update_status = ChannelMonitorUpdateStatus::InProgress;
934+
}
935+
Err(_) => {
858936
// Take the monitors lock for writing so that we poison it and any future
859937
// operations going forward fail immediately.
860938
core::mem::drop(pending_monitor_updates);
861-
core::mem::drop(monitors);
939+
// core::mem::drop(monitors);
862940
let _poison = self.monitors.write().unwrap();
863941
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
864942
log_error!(logger, "{}", err_str);
@@ -868,7 +946,7 @@ where C::Target: chain::Filter,
868946
if update_res.is_err() {
869947
ChannelMonitorUpdateStatus::InProgress
870948
} else {
871-
persist_res
949+
update_status
872950
}
873951
}
874952
}
@@ -889,7 +967,7 @@ where C::Target: chain::Filter,
889967
}
890968
}
891969

892-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P>
970+
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, FS: FutureSpawner> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P, FS>
893971
where C::Target: chain::Filter,
894972
T::Target: BroadcasterInterface,
895973
F::Target: FeeEstimator,
@@ -1136,4 +1214,3 @@ mod tests {
11361214
}).is_err());
11371215
}
11381216
}
1139-

0 commit comments

Comments
 (0)