Skip to content

Commit 02b1878

Browse files
committed
Allow RoutingMessageHandler to signal backpressure
Now that we allow `handle_channel_announcement` to (indirectly) spawn async tasks which will complete later, we have to ensure it can apply backpressure all the way up to the TCP socket to ensure we don't end up with too many buffers allocated for UTXO validation. We do this by adding a new method to `RoutingMessageHandler` which allows it to signal if there are "many" checks pending and `channel_announcement` messages should be delayed. The actual `PeerManager` implementation thereof is done in the next commit.
1 parent 0da7bbd commit 02b1878

File tree

6 files changed

+61
-0
lines changed

6 files changed

+61
-0
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ mod tests {
623623
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
624624
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
625625
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() }
626+
fn processing_queue_high(&self) -> bool { false }
626627
}
627628
impl ChannelMessageHandler for MsgHandler {
628629
fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {}

lightning/src/ln/msgs.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,13 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
10821082
/// list of `short_channel_id`s.
10831083
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
10841084

1085+
// Handler queueing status:
1086+
/// Indicates that there are a large number of [`ChannelAnnouncement`] (or other) messages
1087+
/// pending some async action. While there is no guarantee of the rate of future messages, the
1088+
/// caller should seek to reduce the rate of new gossip messages handled, especially
1089+
/// [`ChannelAnnouncement`]s.
1090+
fn processing_queue_high(&self) -> bool;
1091+
10851092
// Handler information:
10861093
/// Gets the node feature flags which this handler itself supports. All available handlers are
10871094
/// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`]

lightning/src/ln/peer_handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
8181
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
8282
InitFeatures::empty()
8383
}
84+
fn processing_queue_high(&self) -> bool { false }
8485
}
8586
impl OnionMessageProvider for IgnoringMessageHandler {
8687
fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }

lightning/src/routing/gossip.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,10 @@ where U::Target: UtxoLookup, L::Target: Logger
657657
features.set_gossip_queries_optional();
658658
features
659659
}
660+
661+
fn processing_queue_high(&self) -> bool {
662+
self.network_graph.pending_checks.too_many_checks_pending()
663+
}
660664
}
661665

662666
impl<G: Deref<Target=NetworkGraph<L>>, U: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync<G, U, L>

lightning/src/routing/utxo.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,13 @@ impl UtxoFuture {
148148
///
149149
/// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
150150
/// forwarding the validated gossip message onwards to peers.
151+
///
152+
/// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
153+
/// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
154+
/// after this.
155+
///
156+
/// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
157+
/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
151158
pub fn resolve_without_forwarding<L: Deref>(&self,
152159
graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
153160
where L::Target: Logger {
@@ -158,6 +165,13 @@ impl UtxoFuture {
158165
///
159166
/// The given `gossip` is used to broadcast any validated messages onwards to all peers which
160167
/// have available buffer space.
168+
///
169+
/// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
170+
/// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
171+
/// after this.
172+
///
173+
/// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
174+
/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
161175
pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
162176
graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
163177
) where L::Target: Logger, U::Target: UtxoLookup {
@@ -510,4 +524,36 @@ impl PendingChecks {
510524
}
511525
}
512526
}
527+
528+
/// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
529+
/// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
530+
/// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
531+
/// which we'll have to process. With a socket buffer of 4KB and a minimum
532+
/// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
533+
/// count` messages to process beyond this limit. Because we'll probably have a few peers,
534+
/// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
535+
/// checks should be more than enough for decent parallelism.
536+
const MAX_PENDING_LOOKUPS: usize = 32;
537+
538+
/// Returns true if there are a large number of async checks pending and future
539+
/// `channel_announcement` messages should be delayed. Note that this is only a hint and
540+
/// messages already in-flight may still have to be handled for various reasons.
541+
pub(super) fn too_many_checks_pending(&self) -> bool {
542+
let mut pending_checks = self.internal.lock().unwrap();
543+
if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
544+
// If we have many channel checks pending, ensure we don't have any dangling checks
545+
// (i.e. checks where the user told us they'd call back but drop'd the `AccessFuture`
546+
// instead) before we commit to applying backpressure.
547+
pending_checks.channels.retain(|_, chan| {
548+
Weak::upgrade(&chan).is_some()
549+
});
550+
pending_checks.nodes.retain(|_, channels| {
551+
channels.retain(|chan| Weak::upgrade(&chan).is_some());
552+
!channels.is_empty()
553+
});
554+
pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
555+
} else {
556+
false
557+
}
558+
}
513559
}

lightning/src/util/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,8 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
571571
features.set_gossip_queries_optional();
572572
features
573573
}
574+
575+
fn processing_queue_high(&self) -> bool { false }
574576
}
575577

576578
impl events::MessageSendEventsProvider for TestRoutingMessageHandler {

0 commit comments

Comments
 (0)