-
Notifications
You must be signed in to change notification settings - Fork 411
Allow async events processing without holding total_consistency_lock
#2199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -342,7 +342,8 @@ macro_rules! define_run_body { | |
// falling back to our usual hourly prunes. This avoids short-lived clients never | ||
// pruning their network graph. We run once 60 seconds after startup before | ||
// continuing our normal cadence. | ||
if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) { | ||
let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; | ||
if $timer_elapsed(&mut last_prune_call, prune_timer) { | ||
// The network graph must not be pruned while rapid sync completion is pending | ||
if let Some(network_graph) = $gossip_sync.prunable_network_graph() { | ||
#[cfg(feature = "std")] { | ||
|
@@ -360,7 +361,8 @@ macro_rules! define_run_body { | |
|
||
have_pruned = true; | ||
} | ||
last_prune_call = $get_timer(NETWORK_PRUNE_TIMER); | ||
let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; | ||
last_prune_call = $get_timer(prune_timer); | ||
} | ||
|
||
if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { | ||
|
@@ -867,7 +869,10 @@ mod tests { | |
|
||
if key == "network_graph" { | ||
if let Some(sender) = &self.graph_persistence_notifier { | ||
sender.send(()).unwrap(); | ||
match sender.send(()) { | ||
Ok(()) => {}, | ||
Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait, why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we're shutting the other task down after the first send. However, we also persist again on shutdown, which triggers a second send, which would panic as the receiver is already gone at that point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a comment for why this is ok would be helpful |
||
} | ||
}; | ||
|
||
if let Some((error, message)) = self.graph_error { | ||
|
@@ -1480,10 +1485,9 @@ mod tests { | |
}) | ||
}, false, | ||
); | ||
// TODO: Drop _local and simply spawn after #2003 | ||
let local_set = tokio::task::LocalSet::new(); | ||
local_set.spawn_local(bp_future); | ||
local_set.spawn_local(async move { | ||
|
||
let t1 = tokio::spawn(bp_future); | ||
let t2 = tokio::spawn(async move { | ||
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, { | ||
let mut i = 0; | ||
loop { | ||
|
@@ -1495,7 +1499,9 @@ mod tests { | |
}, tokio::time::sleep(Duration::from_millis(1)).await); | ||
exit_sender.send(()).unwrap(); | ||
}); | ||
local_set.await; | ||
let (r1, r2) = tokio::join!(t1, t2); | ||
r1.unwrap().unwrap(); | ||
r2.unwrap() | ||
} | ||
|
||
macro_rules! do_test_payment_path_scoring { | ||
|
@@ -1649,13 +1655,14 @@ mod tests { | |
}) | ||
}, false, | ||
); | ||
// TODO: Drop _local and simply spawn after #2003 | ||
let local_set = tokio::task::LocalSet::new(); | ||
local_set.spawn_local(bp_future); | ||
local_set.spawn_local(async move { | ||
let t1 = tokio::spawn(bp_future); | ||
let t2 = tokio::spawn(async move { | ||
do_test_payment_path_scoring!(nodes, receiver.recv().await); | ||
exit_sender.send(()).unwrap(); | ||
}); | ||
local_set.await; | ||
|
||
let (r1, r2) = tokio::join!(t1, t2); | ||
r1.unwrap().unwrap(); | ||
r2.unwrap() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,7 +72,7 @@ use core::{cmp, mem}; | |
use core::cell::RefCell; | ||
use crate::io::Read; | ||
use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState}; | ||
use core::sync::atomic::{AtomicUsize, Ordering}; | ||
use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; | ||
use core::time::Duration; | ||
use core::ops::Deref; | ||
|
||
|
@@ -926,6 +926,8 @@ where | |
|
||
/// See `ChannelManager` struct-level documentation for lock order requirements. | ||
pending_events: Mutex<Vec<events::Event>>, | ||
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously. | ||
pending_events_processor: AtomicBool, | ||
/// See `ChannelManager` struct-level documentation for lock order requirements. | ||
pending_background_events: Mutex<Vec<BackgroundEvent>>, | ||
/// Used when we have to take a BIG lock to make sure everything is self-consistent. | ||
|
@@ -1680,30 +1682,47 @@ macro_rules! handle_new_monitor_update { | |
|
||
macro_rules! process_events_body { | ||
($self: expr, $event_to_handle: expr, $handle_event: expr) => { | ||
// We'll acquire our total consistency lock until the returned future completes so that | ||
// we can be sure no other persists happen while processing events. | ||
let _read_guard = $self.total_consistency_lock.read().unwrap(); | ||
let mut processed_all_events = false; | ||
while !processed_all_events { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How come this is all run in a while loop? IIUC there may be other events added to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we no longer allow multiple processors to run at the same time - if one process_events call starts, and makes some progress, then an event is generated, causing a second process_events call to happen, the second call might return early, but there's some events there the user expects to have processed. Thus, we need to make sure the first process_events goes around again and processes the remaining events. |
||
if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { | ||
return; | ||
} | ||
|
||
let mut result = NotifyOption::SkipPersist; | ||
let mut result = NotifyOption::SkipPersist; | ||
|
||
// TODO: This behavior should be documented. It's unintuitive that we query | ||
// ChannelMonitors when clearing other events. | ||
if $self.process_pending_monitor_events() { | ||
result = NotifyOption::DoPersist; | ||
} | ||
{ | ||
// We'll acquire our total consistency lock so that we can be sure no other | ||
// persists happen while processing monitor events. | ||
let _read_guard = $self.total_consistency_lock.read().unwrap(); | ||
|
||
// TODO: This behavior should be documented. It's unintuitive that we query | ||
// ChannelMonitors when clearing other events. | ||
if $self.process_pending_monitor_events() { | ||
result = NotifyOption::DoPersist; | ||
} | ||
} | ||
|
||
let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]); | ||
if !pending_events.is_empty() { | ||
result = NotifyOption::DoPersist; | ||
} | ||
let pending_events = $self.pending_events.lock().unwrap().clone(); | ||
let num_events = pending_events.len(); | ||
if !pending_events.is_empty() { | ||
result = NotifyOption::DoPersist; | ||
} | ||
|
||
for event in pending_events { | ||
$event_to_handle = event; | ||
$handle_event; | ||
} | ||
for event in pending_events { | ||
$event_to_handle = event; | ||
$handle_event; | ||
} | ||
|
||
if result == NotifyOption::DoPersist { | ||
$self.persistence_notifier.notify(); | ||
{ | ||
let mut pending_events = $self.pending_events.lock().unwrap(); | ||
pending_events.drain(..num_events); | ||
processed_all_events = pending_events.is_empty(); | ||
$self.pending_events_processor.store(false, Ordering::Release); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this only happen if !processed_all_events? Not a big deal either way, I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean if we processed all events? Yeah, I think I'd leave it as is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, no, I mean literally just move the setter here into a check for if we're about to go around again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, yes, had understood as much, but we def. need to reset in the case we leave the method. We could have moved the |
||
} | ||
|
||
if result == NotifyOption::DoPersist { | ||
$self.persistence_notifier.notify(); | ||
wpaulino marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
} | ||
|
@@ -1771,6 +1790,7 @@ where | |
per_peer_state: FairRwLock::new(HashMap::new()), | ||
|
||
pending_events: Mutex::new(Vec::new()), | ||
pending_events_processor: AtomicBool::new(false), | ||
pending_background_events: Mutex::new(Vec::new()), | ||
total_consistency_lock: RwLock::new(()), | ||
persistence_notifier: Notifier::new(), | ||
|
@@ -7916,6 +7936,7 @@ where | |
per_peer_state: FairRwLock::new(per_peer_state), | ||
|
||
pending_events: Mutex::new(pending_events_read), | ||
pending_events_processor: AtomicBool::new(false), | ||
pending_background_events: Mutex::new(pending_background_events), | ||
total_consistency_lock: RwLock::new(()), | ||
persistence_notifier: Notifier::new(), | ||
|
@@ -7947,8 +7968,6 @@ mod tests { | |
use bitcoin::hashes::Hash; | ||
use bitcoin::hashes::sha256::Hash as Sha256; | ||
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; | ||
#[cfg(feature = "std")] | ||
use core::time::Duration; | ||
use core::sync::atomic::Ordering; | ||
use crate::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; | ||
use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; | ||
|
Uh oh!
There was an error while loading. Please reload this page.