Skip to content

Commit eabb028

Browse files
committed
Allow RoutingMessageHandler to signal backpressure
Now that we allow `handle_channel_announcement` to (indirectly) spawn async tasks which will complete later, we have to ensure it can apply backpressure all the way up to the TCP socket to ensure we don't end up with too many buffers allocated for UTXO validation. We do this by adding a new method to `RoutingMessageHandler` which allows it to signal if there are "many" checks pending and `channel_announcement` messages should be delayed. The actual `PeerManager` implementation thereof is done in the next commit.
1 parent 6c5cd1e commit eabb028

File tree

6 files changed

+40
-0
lines changed

6 files changed

+40
-0
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,7 @@ mod tests {
622622
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
623623
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
624624
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() }
625+
fn processing_queue_high(&self) -> bool { false }
625626
}
626627
impl ChannelMessageHandler for MsgHandler {
627628
fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {}

lightning/src/ln/msgs.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,6 +1080,14 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
10801080
/// list of `short_channel_id`s.
10811081
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
10821082

1083+
// Handler queueing status
1084+
1085+
/// Indicates that there are a large number of [`ChannelAnnouncement`] (or other) messages
1086+
/// pending some async action. While there is no guarantee of the rate of future messages, the
1087+
/// caller should seek to reduce the rate of new gossip messages handled, and especially
1088+
/// [`ChannelAnnouncement`]s.
1089+
fn processing_queue_high(&self) -> bool;
1090+
10831091
// Handler information:
10841092
/// Gets the node feature flags which this handler itself supports. All available handlers are
10851093
/// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`]

lightning/src/ln/peer_handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
8181
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
8282
InitFeatures::empty()
8383
}
84+
fn processing_queue_high(&self) -> bool { false }
8485
}
8586
impl OnionMessageProvider for IgnoringMessageHandler {
8687
fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }

lightning/src/routing/gossip.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,10 @@ where C::Target: ChainAccess, L::Target: Logger
637637
features.set_gossip_queries_optional();
638638
features
639639
}
640+
641+
fn processing_queue_high(&self) -> bool {
642+
self.network_graph.pending_checks.too_many_checks_pending()
643+
}
640644
}
641645

642646
impl<G: Deref<Target=NetworkGraph<L>>, C: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync<G, C, L>

lightning/src/routing/gossip_checking.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,4 +477,28 @@ impl PendingChecks {
477477
}
478478
}
479479
}
480+
481+
const MAX_PENDING_LOOKUPS: usize = 32;
482+
483+
/// Signls if there are a large number of async checks pending and future channel_announcement
484+
/// messages should be delayed. Note that this is only a hint and messages already in-flight
485+
/// may still have to be handled for various reasons.
486+
pub(super) fn too_many_checks_pending(&self) -> bool {
487+
let mut pending_checks = self.internal.lock().unwrap();
488+
if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
489+
// If we have many channel checks pending, ensure we don't have any dangling checks
490+
// (i.e. checks where the user told us they'd call back but drop'd the `AccessFuture`
491+
// instead) before we commit to applying backpressure.
492+
pending_checks.channels.retain(|_, chan| {
493+
Weak::upgrade(&chan).is_some()
494+
});
495+
pending_checks.nodes.retain(|_, channels| {
496+
channels.retain(|chan| Weak::upgrade(&chan).is_some());
497+
!channels.is_empty()
498+
});
499+
pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
500+
} else {
501+
false
502+
}
503+
}
480504
}

lightning/src/util/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,8 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
550550
features.set_gossip_queries_optional();
551551
features
552552
}
553+
554+
fn processing_queue_high(&self) -> bool { false }
553555
}
554556

555557
impl events::MessageSendEventsProvider for TestRoutingMessageHandler {

0 commit comments

Comments
 (0)