From fcf1282975fe8627efdd7657ba3007640daedce8 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 3 Apr 2023 18:38:26 +0000 Subject: [PATCH 1/6] Make `lightning-background-processor` test failures more debugable Instead of asserting a `Result` `is_ok`, we should always simply `unwrap` to get a backgrace, and we should avoid doing so if the thread is already panicking. --- lightning-background-processor/src/lib.rs | 24 +++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 2b150cb3025..c30e5ad6676 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1186,7 +1186,9 @@ mod tests { let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string()); check_persisted_data!(nodes[0].scorer, filepath.clone()); - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1208,7 +1210,9 @@ mod tests { } } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1300,7 +1304,9 @@ mod tests { nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding); let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id()); - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } // Set up a background event handler for SpendableOutputs events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); @@ -1326,7 +1332,9 @@ mod tests { _ => panic!("Unexpected event: {:?}", event), } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1345,7 +1353,9 @@ mod tests { } } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1519,6 +1529,8 @@ mod tests { _ => panic!("Unexpected event"), } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } } From 8676c5aa4e18086bf56a21966de01f173b6b6983 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 3 Apr 2023 20:11:30 +0000 Subject: [PATCH 2/6] Don't immediately exit BP if `ChannelManager` is persistable If `ChannelManager` is persistable before the async background processor even starts, it may not even get around to overwriting the `should_exit` flag before testing it, and the default value is (incorrectly) true, causing an immediate unconditional exit. The default value should simply be false. Fixes #2140 --- lightning-background-processor/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c30e5ad6676..3dd099b495d 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -518,7 +518,7 @@ where UMH::Target: 'static + CustomMessageHandler, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, { - let mut should_break = true; + let mut should_break = false; let async_event_handler = |event| { let network_graph = gossip_sync.network_graph(); let event_handler = &event_handler; From ca367f5d085cdc8e9a3e3bca4a846d840eea7c26 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 3 Apr 2023 20:15:04 +0000 Subject: [PATCH 3/6] Ensure `background-processor` exits after any sleep future says to If the user's sleep future passed to an async background processor only returns true for exiting once and then reverts back to false, we should exit anyway when we get a chance to. We do to this here by always ensuring we check the exit flag even when only polling sleep futures with no intent to (yet) exit. This is utilized in the tests added in the coming commit(s). --- lightning-background-processor/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 3dd099b495d..da5f2acf86c 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -554,7 +554,10 @@ where |fut: &mut SleepFuture, _| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); - core::pin::Pin::new(fut).poll(&mut ctx).is_ready() + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => { should_break = exit; true }, + task::Poll::Pending => false, + } }, mobile_interruptable_platform) } From f0b928b06df43c8be43aef30019d4d80c0fdbea7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 3 Apr 2023 20:19:49 +0000 Subject: [PATCH 4/6] Make BP's `test_payment_path_scoring` dual sync/async. This finally gives us a bit of test coverage of the async BP, which was embarrassingly missing until now. --- lightning-background-processor/Cargo.toml | 1 + lightning-background-processor/src/lib.rs | 231 +++++++++++++--------- 2 files changed, 138 insertions(+), 94 deletions(-) diff --git a/lightning-background-processor/Cargo.toml b/lightning-background-processor/Cargo.toml index af8bb25e55f..e2acb2240a7 100644 --- a/lightning-background-processor/Cargo.toml +++ b/lightning-background-processor/Cargo.toml @@ -25,6 +25,7 @@ lightning = { version = "0.0.114", path = "../lightning", default-features = fal lightning-rapid-gossip-sync = { version = "0.0.114", path = "../lightning-rapid-gossip-sync", default-features = false } [dev-dependencies] +tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] } lightning = { version = "0.0.114", path = "../lightning", features = ["_test_utils"] } lightning-invoice = { version = "0.22.0", path = "../lightning-invoice" } lightning-persister = { version = "0.0.114", path = "../lightning-persister" } diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index da5f2acf86c..41aceedc846 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1419,12 +1419,100 @@ mod tests { assert_eq!(network_graph.read_only().channels().len(), 0); } + macro_rules! do_test_payment_path_scoring { + ($nodes: expr, $receive: expr) => { + // Ensure that we update the scorer when relevant events are processed. In this case, we ensure + // that we update the scorer upon a payment path succeeding (note that the channel must be + // public or else we won't score it). + // A background event handler for FundingGenerationReady events must be hooked up to a + // running background processor. + let scored_scid = 4242; + let secp_ctx = Secp256k1::new(); + let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap(); + let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey); + + let path = vec![RouteHop { + pubkey: node_1_id, + node_features: NodeFeatures::empty(), + short_channel_id: scored_scid, + channel_features: ChannelFeatures::empty(), + fee_msat: 0, + cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32, + }]; + + $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); + $nodes[0].node.push_pending_event(Event::PaymentPathFailed { + payment_id: None, + payment_hash: PaymentHash([42; 32]), + payment_failed_permanently: false, + failure: PathFailure::OnPath { network_update: None }, + path: path.clone(), + short_channel_id: Some(scored_scid), + }); + let event = $receive.expect("PaymentPathFailed not handled within deadline"); + match event { + Event::PaymentPathFailed { .. } => {}, + _ => panic!("Unexpected event"), + } + + // Ensure we'll score payments that were explicitly failed back by the destination as + // ProbeSuccess. + $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].node.push_pending_event(Event::PaymentPathFailed { + payment_id: None, + payment_hash: PaymentHash([42; 32]), + payment_failed_permanently: true, + failure: PathFailure::OnPath { network_update: None }, + path: path.clone(), + short_channel_id: None, + }); + let event = $receive.expect("PaymentPathFailed not handled within deadline"); + match event { + Event::PaymentPathFailed { .. } => {}, + _ => panic!("Unexpected event"), + } + + $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() }); + $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful { + payment_id: PaymentId([42; 32]), + payment_hash: None, + path: path.clone(), + }); + let event = $receive.expect("PaymentPathSuccessful not handled within deadline"); + match event { + Event::PaymentPathSuccessful { .. } => {}, + _ => panic!("Unexpected event"), + } + + $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].node.push_pending_event(Event::ProbeSuccessful { + payment_id: PaymentId([42; 32]), + payment_hash: PaymentHash([42; 32]), + path: path.clone(), + }); + let event = $receive.expect("ProbeSuccessful not handled within deadline"); + match event { + Event::ProbeSuccessful { .. } => {}, + _ => panic!("Unexpected event"), + } + + $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() }); + $nodes[0].node.push_pending_event(Event::ProbeFailed { + payment_id: PaymentId([42; 32]), + payment_hash: PaymentHash([42; 32]), + path, + short_channel_id: Some(scored_scid), + }); + let event = $receive.expect("ProbeFailure not handled within deadline"); + match event { + Event::ProbeFailed { .. } => {}, + _ => panic!("Unexpected event"), + } + } + } + #[test] fn test_payment_path_scoring() { - // Ensure that we update the scorer when relevant events are processed. In this case, we ensure - // that we update the scorer upon a payment path succeeding (note that the channel must be - // public or else we won't score it). - // Set up a background event handler for FundingGenerationReady events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); let event_handler = move |event: Event| match event { Event::PaymentPathFailed { .. } => sender.send(event).unwrap(), @@ -1439,101 +1527,56 @@ mod tests { let persister = Arc::new(Persister::new(data_dir)); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); - let scored_scid = 4242; - let secp_ctx = Secp256k1::new(); - let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap(); - let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey); - - let path = vec![RouteHop { - pubkey: node_1_id, - node_features: NodeFeatures::empty(), - short_channel_id: scored_scid, - channel_features: ChannelFeatures::empty(), - fee_msat: 0, - cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32, - }]; - - nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); - nodes[0].node.push_pending_event(Event::PaymentPathFailed { - payment_id: None, - payment_hash: PaymentHash([42; 32]), - payment_failed_permanently: false, - failure: PathFailure::OnPath { network_update: None }, - path: path.clone(), - short_channel_id: Some(scored_scid), - }); - let event = receiver - .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("PaymentPathFailed not handled within deadline"); - match event { - Event::PaymentPathFailed { .. } => {}, - _ => panic!("Unexpected event"), - } + do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE))); - // Ensure we'll score payments that were explicitly failed back by the destination as - // ProbeSuccess. - nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); - nodes[0].node.push_pending_event(Event::PaymentPathFailed { - payment_id: None, - payment_hash: PaymentHash([42; 32]), - payment_failed_permanently: true, - failure: PathFailure::OnPath { network_update: None }, - path: path.clone(), - short_channel_id: None, - }); - let event = receiver - .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("PaymentPathFailed not handled within deadline"); - match event { - Event::PaymentPathFailed { .. } => {}, - _ => panic!("Unexpected event"), + if !std::thread::panicking() { + bg_processor.stop().unwrap(); } + } - nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() }); - nodes[0].node.push_pending_event(Event::PaymentPathSuccessful { - payment_id: PaymentId([42; 32]), - payment_hash: None, - path: path.clone(), - }); - let event = receiver - .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("PaymentPathSuccessful not handled within deadline"); - match event { - Event::PaymentPathSuccessful { .. } => {}, - _ => panic!("Unexpected event"), - } + #[tokio::test] + #[cfg(feature = "futures")] + async fn test_payment_path_scoring_async() { + let (sender, mut receiver) = tokio::sync::mpsc::channel(1); + let event_handler = move |event: Event| { + let sender_ref = sender.clone(); + async move { + match event { + Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() }, + Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() }, + Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() }, + Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() }, + _ => panic!("Unexpected event: {:?}", event), + } + } + }; - nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); - nodes[0].node.push_pending_event(Event::ProbeSuccessful { - payment_id: PaymentId([42; 32]), - payment_hash: PaymentHash([42; 32]), - path: path.clone(), - }); - let event = receiver - .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("ProbeSuccessful not handled within deadline"); - match event { - Event::ProbeSuccessful { .. } => {}, - _ => panic!("Unexpected event"), - } + let nodes = create_nodes(1, "test_payment_path_scoring_async".to_string()); + let data_dir = nodes[0].persister.get_data_dir(); + let persister = Arc::new(Persister::new(data_dir)); - nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() }); - nodes[0].node.push_pending_event(Event::ProbeFailed { - payment_id: PaymentId([42; 32]), - payment_hash: PaymentHash([42; 32]), - path, - short_channel_id: Some(scored_scid), + let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); + + let bp_future = super::process_events_async( + persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), + nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), + Some(nodes[0].scorer.clone()), move |dur: Duration| { + let mut exit_receiver = exit_receiver.clone(); + Box::pin(async move { + tokio::select! { + _ = tokio::time::sleep(dur) => false, + _ = exit_receiver.changed() => true, + } + }) + }, false, + ); + // TODO: Drop _local and simply spawn after #2003 + let local_set = tokio::task::LocalSet::new(); + local_set.spawn_local(bp_future); + local_set.spawn_local(async move { + do_test_payment_path_scoring!(nodes, receiver.recv().await); + exit_sender.send(()).unwrap(); }); - let event = receiver - .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("ProbeFailure not handled within deadline"); - match event { - Event::ProbeFailed { .. } => {}, - _ => panic!("Unexpected event"), - } - - if !std::thread::panicking() { - bg_processor.stop().unwrap(); - } + local_set.await; } } From 2bc55b22d3a44130a08f21b364b95807d32e7197 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 3 Apr 2023 20:34:13 +0000 Subject: [PATCH 5/6] Make BP's `not_pruning_network_graph_until...` test dual async-sync --- lightning-background-processor/src/lib.rs | 138 +++++++++++++++------- 1 file changed, 94 insertions(+), 44 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 41aceedc846..79a8037660c 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1361,62 +1361,112 @@ mod tests { } } + macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion { + ($nodes: expr, $receive: expr, $sleep: expr) => { + let features = ChannelFeatures::empty(); + $nodes[0].network_graph.add_channel_from_partial_announcement( + 42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id() + ).expect("Failed to update channel from partial announcement"); + let original_graph_description = $nodes[0].network_graph.to_string(); + assert!(original_graph_description.contains("42: features: 0000, node_one:")); + assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1); + + loop { + $sleep; + let log_entries = $nodes[0].logger.lines.lock().unwrap(); + let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string(); + if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter)) + .unwrap_or(&0) > 1 + { + // Wait until the loop has gone around at least twice. + break + } + } + + let initialization_input = vec![ + 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, + 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218, + 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251, + 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125, + 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136, + 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106, + 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138, + 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175, + 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128, + 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, + 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, + 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, + 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192, + ]; + $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap(); + + // this should have added two channels + assert_eq!($nodes[0].network_graph.read_only().channels().len(), 3); + + $receive.expect("Network graph not pruned within deadline"); + + // all channels should now be pruned + assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0); + } + } + #[test] fn test_not_pruning_network_graph_until_graph_sync_completion() { + let (sender, receiver) = std::sync::mpsc::sync_channel(1); + let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string()); let data_dir = nodes[0].persister.get_data_dir(); - let (sender, receiver) = std::sync::mpsc::sync_channel(1); let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); - let network_graph = nodes[0].network_graph.clone(); - let features = ChannelFeatures::empty(); - network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id()) - .expect("Failed to update channel from partial announcement"); - let original_graph_description = network_graph.to_string(); - assert!(original_graph_description.contains("42: features: 0000, node_one:")); - assert_eq!(network_graph.read_only().channels().len(), 1); let event_handler = |_: _| {}; let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); - loop { - let log_entries = nodes[0].logger.lines.lock().unwrap(); - let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string(); - if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter)) - .unwrap_or(&0) > 1 - { - // Wait until the loop has gone around at least twice. - break - } - } - - let initialization_input = vec![ - 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, - 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218, - 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251, - 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125, - 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136, - 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106, - 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138, - 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175, - 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128, - 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, - 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, - 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, - 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192, - ]; - nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap(); - - // this should have added two channels - assert_eq!(network_graph.read_only().channels().len(), 3); - - receiver - .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)) - .expect("Network graph not pruned within deadline"); + do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, + receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)), + std::thread::sleep(Duration::from_millis(1))); background_processor.stop().unwrap(); + } + + #[tokio::test] + #[cfg(feature = "futures")] + async fn test_not_pruning_network_graph_until_graph_sync_completion_async() { + let (sender, receiver) = std::sync::mpsc::sync_channel(1); + + let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async".to_string()); + let data_dir = nodes[0].persister.get_data_dir(); + let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); - // all channels should now be pruned - assert_eq!(network_graph.read_only().channels().len(), 0); + let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); + let bp_future = super::process_events_async( + persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), + nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), + Some(nodes[0].scorer.clone()), move |dur: Duration| { + let mut exit_receiver = exit_receiver.clone(); + Box::pin(async move { + tokio::select! { + _ = tokio::time::sleep(dur) => false, + _ = exit_receiver.changed() => true, + } + }) + }, false, + ); + // TODO: Drop _local and simply spawn after #2003 + let local_set = tokio::task::LocalSet::new(); + local_set.spawn_local(bp_future); + local_set.spawn_local(async move { + do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, { + let mut i = 0; + loop { + tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await; + if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); } + assert!(i < 5); + i += 1; + } + }, tokio::time::sleep(Duration::from_millis(1)).await); + exit_sender.send(()).unwrap(); + }); + local_set.await; } macro_rules! do_test_payment_path_scoring { From 068d2c65cb7441278d9707d1c4d45d2782250525 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 3 Apr 2023 20:47:02 +0000 Subject: [PATCH 6/6] Add an async version of BP's test_channel_manager_persist_error This gives us coverage of an async BP returning an error. --- lightning-background-processor/src/lib.rs | 29 +++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 79a8037660c..a45dc6d6c74 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1237,6 +1237,35 @@ mod tests { } } + #[tokio::test] + #[cfg(feature = "futures")] + async fn test_channel_manager_persist_error_async() { + // Test that if we encounter an error during manager persistence, the thread panics. + let nodes = create_nodes(2, "test_persist_error_sync".to_string()); + open_channel!(nodes[0], nodes[1], 100000); + + let data_dir = nodes[0].persister.get_data_dir(); + let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); + + let bp_future = super::process_events_async( + persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), + nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), + Some(nodes[0].scorer.clone()), move |dur: Duration| { + Box::pin(async move { + tokio::time::sleep(dur).await; + false // Never exit + }) + }, false, + ); + match bp_future.await { + Ok(_) => panic!("Expected error persisting manager"), + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::Other); + assert_eq!(e.get_ref().unwrap().to_string(), "test"); + }, + } + } + #[test] fn test_network_graph_persist_error() { // Test that if we encounter an error during network graph persistence, an error gets returned.