@@ -1257,13 +1257,28 @@ where
1257
1257
1258
1258
pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
1259
1259
1260
+ /// Tracks the channel_update message that were not broadcasted because
1261
+ /// we were not connected to any peers.
1262
+ pending_broadcast_messages: Mutex<PendingBroadcastMessages>,
1263
+
1260
1264
entropy_source: ES,
1261
1265
node_signer: NS,
1262
1266
signer_provider: SP,
1263
1267
1264
1268
logger: L,
1265
1269
}
1266
1270
1271
+ pub struct PendingBroadcastMessages {
1272
+ /// The original broadcast events
1273
+ pub broadcast_message: Vec<MessageSendEvent>,
1274
+ /// The number of ticks before retry broadcasting
1275
+ pub ticks_remaining: Option<i32>,
1276
+ }
1277
+
1278
+ /// The number of ticks that may elapse before trying to rebroadcast
1279
+ /// the pending broadcast messages
1280
+ const PENDING_BROADCAST_MESSAGES_TIMER_TICKS: i32 = 2;
1281
+
1267
1282
/// Chain-related parameters used to construct a new `ChannelManager`.
1268
1283
///
1269
1284
/// Typically, the block-specific parameters are derived from the best block hash for the network,
@@ -2338,6 +2353,7 @@ where
2338
2353
funding_batch_states: Mutex::new(BTreeMap::new()),
2339
2354
2340
2355
pending_offers_messages: Mutex::new(Vec::new()),
2356
+ pending_broadcast_messages: Mutex::new(PendingBroadcastMessages { broadcast_message: Vec::new(), ticks_remaining: None }),
2341
2357
2342
2358
entropy_source,
2343
2359
node_signer,
@@ -2841,15 +2857,44 @@ where
2841
2857
if let Some(update) = update_opt {
2842
2858
// Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
2843
2859
// not try to broadcast it via whatever peer we have.
2860
+ let brodcast_message_evt = events::MessageSendEvent::BroadcastChannelUpdate {
2861
+ msg: update
2862
+ };
2863
+
2844
2864
let per_peer_state = self.per_peer_state.read().unwrap();
2845
2865
let a_peer_state_opt = per_peer_state.get(peer_node_id)
2846
2866
.ok_or(per_peer_state.values().next());
2867
+
2868
+ // if we were able to get the peer we just force closed on.
2847
2869
if let Ok(a_peer_state_mutex) = a_peer_state_opt {
2848
2870
let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
2849
- a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
2850
- msg: update
2851
- });
2871
+ a_peer_state.pending_msg_events.push(brodcast_message_evt);
2872
+ }
2873
+
2874
+ // if we connected to some other random peer.
2875
+ else if let Err(a_peer_state_mutex) = a_peer_state_opt {
2876
+ match a_peer_state_mutex {
2877
+ // if we were able to connect to Some peer.
2878
+ Some(val) => {
2879
+ let mut a_peer_state = val.lock().unwrap();
2880
+ a_peer_state.pending_msg_events.push(brodcast_message_evt);
2881
+ log_info!(self.logger,
2882
+ "Not able to broadcast channel update to peer we force-closed on. Broadcasting to some random peer.");
2883
+ },
2884
+
2885
+ // If we connected to no one.
2886
+ None => {
2887
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
2888
+ if pending_broadcast_messages.broadcast_message.len() == 0 {
2889
+ pending_broadcast_messages.ticks_remaining = Some(PENDING_BROADCAST_MESSAGES_TIMER_TICKS);
2890
+ }
2891
+ pending_broadcast_messages.broadcast_message.push(brodcast_message_evt);
2892
+ log_info!(self.logger, "Not able to broadcast channel_update of force-closed channel right now.
2893
+ Will try rebroadcasting later.");
2894
+ }
2895
+ }
2852
2896
}
2897
+
2853
2898
}
2854
2899
2855
2900
Ok(counterparty_node_id)
@@ -4915,6 +4960,45 @@ where
4915
4960
4916
4961
{
4917
4962
let per_peer_state = self.per_peer_state.read().unwrap();
4963
+
4964
+ {
4965
+ // Get pending messages to be broadcasted.
4966
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
4967
+
4968
+ // If we have pending broadcast events
4969
+ if pending_broadcast_messages.ticks_remaining != None {
4970
+ // And it is time to broadcast
4971
+ if pending_broadcast_messages.ticks_remaining == Some(0) {
4972
+ // And we are connected to non-zero number of peers, we can broadcast successfully
4973
+ if per_peer_state.len() > 0 {
4974
+ let a_peer_state_mutex = per_peer_state.values().next().unwrap();
4975
+ let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
4976
+
4977
+ for broadcast_evts in pending_broadcast_messages.broadcast_message.iter() {
4978
+ a_peer_state.pending_msg_events.push(broadcast_evts.clone());
4979
+ }
4980
+ // After broadcasting, we clear the event vector.
4981
+ // And set the timer to None.
4982
+ pending_broadcast_messages.broadcast_message.clear();
4983
+ pending_broadcast_messages.ticks_remaining = None;
4984
+
4985
+ log_info!(self.logger, "Successfully broadcasted the pending channel update messages.");
4986
+ }
4987
+ // Else we restart the counter, waiting for next time to try rebroadcasting
4988
+ else {
4989
+ pending_broadcast_messages.ticks_remaining = Some(PENDING_BROADCAST_MESSAGES_TIMER_TICKS);
4990
+ }
4991
+ }
4992
+ // Else we decrement the counter
4993
+ else {
4994
+ pending_broadcast_messages.ticks_remaining = match pending_broadcast_messages.ticks_remaining {
4995
+ Some(val) => Some(val - 1),
4996
+ None => None
4997
+ }
4998
+ }
4999
+ }
5000
+ }
5001
+
4918
5002
for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() {
4919
5003
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
4920
5004
let peer_state = &mut *peer_state_lock;
@@ -10810,6 +10894,8 @@ where
10810
10894
10811
10895
pending_offers_messages: Mutex::new(Vec::new()),
10812
10896
10897
+ pending_broadcast_messages: Mutex::new(PendingBroadcastMessages { broadcast_message: Vec::new(), ticks_remaining: None}),
10898
+
10813
10899
entropy_source: args.entropy_source,
10814
10900
node_signer: args.node_signer,
10815
10901
signer_provider: args.signer_provider,
0 commit comments