Skip to content

Commit 758894e

Browse files
committed
Allow RoutingMessageHandler to signal backpressure
Now that we allow `handle_channel_announcement` to (indirectly) spawn async tasks which will complete later, we have to ensure it can apply backpressure all the way up to the TCP socket to ensure we don't end up with too many buffers allocated for UTXO validation. We do this by adding a new method to `RoutingMessageHandler` which allows it to signal if there are "many" checks pending and `channel_announcement` messages should be delayed. The actual `PeerManager` implementation thereof is done in the next commit.
1 parent 3334dbf commit 758894e

File tree

6 files changed

+61
-0
lines changed

6 files changed

+61
-0
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,7 @@ mod tests {
622622
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
623623
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
624624
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() }
625+
fn processing_queue_high(&self) -> bool { false }
625626
}
626627
impl ChannelMessageHandler for MsgHandler {
627628
fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {}

lightning/src/ln/msgs.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,6 +1080,13 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
10801080
/// list of `short_channel_id`s.
10811081
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
10821082

1083+
// Handler queueing status:
1084+
/// Indicates that there are a large number of [`ChannelAnnouncement`] (or other) messages
1085+
/// pending some async action. While there is no guarantee of the rate of future messages, the
1086+
/// caller should seek to reduce the rate of new gossip messages handled, especially
1087+
/// [`ChannelAnnouncement`]s.
1088+
fn processing_queue_high(&self) -> bool;
1089+
10831090
// Handler information:
10841091
/// Gets the node feature flags which this handler itself supports. All available handlers are
10851092
/// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`]

lightning/src/ln/peer_handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
8181
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
8282
InitFeatures::empty()
8383
}
84+
fn processing_queue_high(&self) -> bool { false }
8485
}
8586
impl OnionMessageProvider for IgnoringMessageHandler {
8687
fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }

lightning/src/routing/gossip.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,10 @@ where C::Target: ChainAccess, L::Target: Logger
637637
features.set_gossip_queries_optional();
638638
features
639639
}
640+
641+
fn processing_queue_high(&self) -> bool {
642+
self.network_graph.pending_checks.too_many_checks_pending()
643+
}
640644
}
641645

642646
impl<G: Deref<Target=NetworkGraph<L>>, C: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync<G, C, L>

lightning/src/routing/gossip_checking.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ impl AccessFuture {
151151
///
152152
/// This is identical to calling [`AccessFuture::resolve`] with a dummy `gossip`, disabling
153153
/// forwarding the validated gossip message onwards to peers.
154+
///
155+
/// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
156+
/// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
157+
/// after this.
158+
///
159+
/// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
160+
/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
154161
pub fn resolve_without_forwarding<L: Deref>(&self,
155162
graph: &NetworkGraph<L>, result: Result<TxOut, ChainAccessError>)
156163
where L::Target: Logger {
@@ -161,6 +168,13 @@ impl AccessFuture {
161168
///
162169
/// The given `gossip` is used to broadcast any validated messages onwards to all peers which
163170
/// have available buffer space.
171+
///
172+
/// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
173+
/// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
174+
/// after this.
175+
///
176+
/// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
177+
/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
164178
pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, C: Deref, GS: Deref<Target = P2PGossipSync<G, C, L>>>(&self,
165179
graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, ChainAccessError>
166180
) where L::Target: Logger, C::Target: ChainAccess {
@@ -515,4 +529,36 @@ impl PendingChecks {
515529
}
516530
}
517531
}
532+
533+
/// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
534+
/// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
535+
/// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
536+
/// which we'll have to process. With a socket buffer of 4KB and a minimum
537+
/// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
538+
/// count` messages to process beyond this limit. Because we'll probably have a few peers,
539+
/// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
540+
/// checks should be more than enough for decent parallelism.
541+
const MAX_PENDING_LOOKUPS: usize = 32;
542+
543+
/// Returns true if there are a large number of async checks pending and future
544+
/// `channel_announcement` messages should be delayed. Note that this is only a hint and
545+
/// messages already in-flight may still have to be handled for various reasons.
546+
pub(super) fn too_many_checks_pending(&self) -> bool {
547+
let mut pending_checks = self.internal.lock().unwrap();
548+
if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
549+
// If we have many channel checks pending, ensure we don't have any dangling checks
550+
// (i.e. checks where the user told us they'd call back but drop'd the `AccessFuture`
551+
// instead) before we commit to applying backpressure.
552+
pending_checks.channels.retain(|_, chan| {
553+
Weak::upgrade(&chan).is_some()
554+
});
555+
pending_checks.nodes.retain(|_, channels| {
556+
channels.retain(|chan| Weak::upgrade(&chan).is_some());
557+
!channels.is_empty()
558+
});
559+
pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
560+
} else {
561+
false
562+
}
563+
}
518564
}

lightning/src/util/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,8 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
570570
features.set_gossip_queries_optional();
571571
features
572572
}
573+
574+
fn processing_queue_high(&self) -> bool { false }
573575
}
574576

575577
impl events::MessageSendEventsProvider for TestRoutingMessageHandler {

0 commit comments

Comments
 (0)