Skip to content

Commit 00a70c2

Browse files
committed
Apply backpressure when we have too many gossip checks in-flight
Now that the `RoutingMessageHandler` can signal that it needs to apply message backpressure, we implement it here in the `PeerManager`. There's not much complicated here, aside from noting that we need to add the ability to call `send_data` with no data to indicate that reading should resume (and track when we may need to make such calls when updating the routing-backpressure state).
1 parent 02b1878 commit 00a70c2

File tree

1 file changed

+71
-31
lines changed

1 file changed

+71
-31
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 71 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,9 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
563563

564564
peer_counter: AtomicCounter,
565565

566+
gossip_processing_backlogged: AtomicBool,
567+
gossip_processing_backlog_lifted: AtomicBool,
568+
566569
node_signer: NS,
567570

568571
logger: L,
@@ -721,6 +724,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
721724
blocked_event_processors: AtomicBool::new(false),
722725
ephemeral_key_midstate,
723726
peer_counter: AtomicCounter::new(),
727+
gossip_processing_backlogged: AtomicBool::new(false),
728+
gossip_processing_backlog_lifted: AtomicBool::new(false),
724729
last_node_announcement_serial: AtomicU32::new(current_time),
725730
logger,
726731
custom_message_handler,
@@ -847,7 +852,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
847852
Ok(())
848853
}
849854

850-
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
855+
fn peer_should_read(&self, peer: &Peer) -> bool {
856+
!self.gossip_processing_backlogged.load(Ordering::Relaxed) && peer.should_read()
857+
}
858+
859+
fn update_gossip_backlogged(&self) {
860+
let new_state = self.message_handler.route_handler.processing_queue_high();
861+
let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed);
862+
if prev_state && !new_state {
863+
self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed);
864+
}
865+
}
866+
867+
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool) {
868+
let mut have_written = false;
851869
while !peer.awaiting_write_event {
852870
if peer.should_buffer_onion_message() {
853871
if let Some(peer_node_id) = peer.their_node_id {
@@ -905,12 +923,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
905923
}
906924

907925
let next_buff = match peer.pending_outbound_buffer.front() {
908-
None => return,
926+
None => {
927+
if force_one_write && !have_written {
928+
let should_read = self.peer_should_read(&peer);
929+
if should_read {
930+
let data_sent = descriptor.send_data(&[], should_read);
931+
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
932+
}
933+
}
934+
return
935+
},
909936
Some(buff) => buff,
910937
};
911938

912939
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
913-
let data_sent = descriptor.send_data(pending, peer.should_read());
940+
let data_sent = descriptor.send_data(pending, self.peer_should_read(&peer));
941+
have_written = true;
914942
peer.pending_outbound_buffer_first_msg_offset += data_sent;
915943
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
916944
peer.pending_outbound_buffer_first_msg_offset = 0;
@@ -945,7 +973,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
945973
Some(peer_mutex) => {
946974
let mut peer = peer_mutex.lock().unwrap();
947975
peer.awaiting_write_event = false;
948-
self.do_attempt_write_data(descriptor, &mut peer);
976+
self.do_attempt_write_data(descriptor, &mut peer, false);
949977
}
950978
};
951979
Ok(())
@@ -1192,7 +1220,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11921220
}
11931221
}
11941222
}
1195-
pause_read = !peer.should_read();
1223+
pause_read = !self.peer_should_read(&peer);
11961224

11971225
if let Some(message) = msg_to_handle {
11981226
match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -1404,19 +1432,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
14041432
.map_err(|e| -> MessageHandlingError { e.into() })? {
14051433
should_forward = Some(wire::Message::ChannelAnnouncement(msg));
14061434
}
1435+
self.update_gossip_backlogged();
14071436
},
14081437
wire::Message::NodeAnnouncement(msg) => {
14091438
if self.message_handler.route_handler.handle_node_announcement(&msg)
14101439
.map_err(|e| -> MessageHandlingError { e.into() })? {
14111440
should_forward = Some(wire::Message::NodeAnnouncement(msg));
14121441
}
1442+
self.update_gossip_backlogged();
14131443
},
14141444
wire::Message::ChannelUpdate(msg) => {
14151445
self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg);
14161446
if self.message_handler.route_handler.handle_channel_update(&msg)
14171447
.map_err(|e| -> MessageHandlingError { e.into() })? {
14181448
should_forward = Some(wire::Message::ChannelUpdate(msg));
14191449
}
1450+
self.update_gossip_backlogged();
14201451
},
14211452
wire::Message::QueryShortChannelIds(msg) => {
14221453
self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?;
@@ -1568,6 +1599,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
15681599
}
15691600
}
15701601

1602+
self.update_gossip_backlogged();
1603+
let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
1604+
15711605
let mut peers_to_disconnect = HashMap::new();
15721606
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
15731607
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
@@ -1797,7 +1831,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17971831
}
17981832

17991833
for (descriptor, peer_mutex) in peers.iter() {
1800-
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
1834+
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap(), flush_read_disabled);
18011835
}
18021836
}
18031837
if !peers_to_disconnect.is_empty() {
@@ -1819,7 +1853,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18191853
self.enqueue_message(&mut *peer, &msg);
18201854
// This isn't guaranteed to work, but if there is enough free
18211855
// room in the send buffer, put the error message there...
1822-
self.do_attempt_write_data(&mut descriptor, &mut *peer);
1856+
self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
18231857
} else {
18241858
log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
18251859
}
@@ -1927,6 +1961,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19271961
{
19281962
let peers_lock = self.peers.read().unwrap();
19291963

1964+
self.update_gossip_backlogged();
1965+
let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
1966+
19301967
for (descriptor, peer_mutex) in peers_lock.iter() {
19311968
let mut peer = peer_mutex.lock().unwrap();
19321969
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
@@ -1942,34 +1979,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19421979
continue;
19431980
}
19441981

1945-
if peer.awaiting_pong_timer_tick_intervals == -1 {
1946-
// Magic value set in `maybe_send_extra_ping`.
1947-
peer.awaiting_pong_timer_tick_intervals = 1;
1982+
loop { // Used as a `goto` to skip writing a Ping message.
1983+
if peer.awaiting_pong_timer_tick_intervals == -1 {
1984+
// Magic value set in `maybe_send_extra_ping`.
1985+
peer.awaiting_pong_timer_tick_intervals = 1;
1986+
peer.received_message_since_timer_tick = false;
1987+
break;
1988+
}
1989+
1990+
if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
1991+
|| peer.awaiting_pong_timer_tick_intervals as u64 >
1992+
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64
1993+
{
1994+
descriptors_needing_disconnect.push(descriptor.clone());
1995+
break;
1996+
}
19481997
peer.received_message_since_timer_tick = false;
1949-
continue;
1950-
}
19511998

1952-
if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
1953-
|| peer.awaiting_pong_timer_tick_intervals as u64 >
1954-
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64
1955-
{
1956-
descriptors_needing_disconnect.push(descriptor.clone());
1957-
continue;
1958-
}
1959-
peer.received_message_since_timer_tick = false;
1999+
if peer.awaiting_pong_timer_tick_intervals > 0 {
2000+
peer.awaiting_pong_timer_tick_intervals += 1;
2001+
break;
2002+
}
19602003

1961-
if peer.awaiting_pong_timer_tick_intervals > 0 {
1962-
peer.awaiting_pong_timer_tick_intervals += 1;
1963-
continue;
2004+
peer.awaiting_pong_timer_tick_intervals = 1;
2005+
let ping = msgs::Ping {
2006+
ponglen: 0,
2007+
byteslen: 64,
2008+
};
2009+
self.enqueue_message(&mut *peer, &ping);
2010+
break;
19642011
}
1965-
1966-
peer.awaiting_pong_timer_tick_intervals = 1;
1967-
let ping = msgs::Ping {
1968-
ponglen: 0,
1969-
byteslen: 64,
1970-
};
1971-
self.enqueue_message(&mut *peer, &ping);
1972-
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
2012+
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer, flush_read_disabled);
19732013
}
19742014
}
19752015

0 commit comments

Comments
 (0)