Skip to content

Commit ecbe4aa

Browse files
committed
Process channel_update/node_announcement async if needed
If we have a `channel_announcement` which is waiting on a UTXO lookup before we can process it, and we receive a `channel_update` or `node_announcement` for the same channel or a node which is a part of the channel, we have to wait until the lookup completes until we can decide if we want to accept the new message. Here, we store the new message in the pending lookup state and process it asynchronously like the original `channel_announcement`.
1 parent 6381bf0 commit ecbe4aa

File tree

2 files changed

+192
-6
lines changed

2 files changed

+192
-6
lines changed

lightning/src/routing/gossip.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,8 +1278,13 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
12781278
}
12791279

12801280
fn update_node_from_announcement_intern(&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>) -> Result<(), LightningError> {
1281-
match self.nodes.write().unwrap().get_mut(&NodeId::from_pubkey(&msg.node_id)) {
1282-
None => Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}),
1281+
let mut nodes = self.nodes.write().unwrap();
1282+
match nodes.get_mut(&NodeId::from_pubkey(&msg.node_id)) {
1283+
None => {
1284+
core::mem::drop(nodes);
1285+
self.pending_checks.check_hold_pending_node_announcement(msg, full_msg)?;
1286+
Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError})
1287+
},
12831288
Some(node) => {
12841289
if let Some(node_info) = node.announcement_info.as_ref() {
12851290
// The timestamp field is somewhat of a misnomer - the BOLTs use it to order
@@ -1707,7 +1712,11 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
17071712

17081713
let mut channels = self.channels.write().unwrap();
17091714
match channels.get_mut(&msg.short_channel_id) {
1710-
None => return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}),
1715+
None => {
1716+
core::mem::drop(channels);
1717+
self.pending_checks.check_hold_pending_channel_update(msg, full_msg)?;
1718+
return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError});
1719+
},
17111720
Some(channel) => {
17121721
if msg.htlc_maximum_msat > MAX_VALUE_MSAT {
17131722
return Err(LightningError{err:

lightning/src/routing/gossip_checking.rs

Lines changed: 180 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
use bitcoin::{BlockHash, TxOut};
1717
use bitcoin::hashes::hex::ToHex;
1818

19+
use bitcoin::secp256k1::PublicKey;
20+
1921
use crate::ln::chan_utils::make_funding_redeemscript;
2022
use crate::ln::msgs::{self, LightningError, ErrorAction};
2123
use crate::routing::gossip::{NetworkGraph, NodeId};
@@ -70,10 +72,48 @@ enum ChannelAnnouncement {
7072
Full(msgs::ChannelAnnouncement),
7173
Unsigned(msgs::UnsignedChannelAnnouncement),
7274
}
75+
impl ChannelAnnouncement {
76+
fn node_id_1(&self) -> &PublicKey {
77+
match self {
78+
ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
79+
ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
80+
}
81+
}
82+
}
83+
84+
enum NodeAnnouncement {
85+
Full(msgs::NodeAnnouncement),
86+
Unsigned(msgs::UnsignedNodeAnnouncement),
87+
}
88+
impl NodeAnnouncement {
89+
fn timestamp(&self) -> u32 {
90+
match self {
91+
NodeAnnouncement::Full(msg) => msg.contents.timestamp,
92+
NodeAnnouncement::Unsigned(msg) => msg.timestamp,
93+
}
94+
}
95+
}
96+
97+
enum ChannelUpdate {
98+
Full(msgs::ChannelUpdate),
99+
Unsigned(msgs::UnsignedChannelUpdate),
100+
}
101+
impl ChannelUpdate {
102+
fn timestamp(&self) -> u32 {
103+
match self {
104+
ChannelUpdate::Full(msg) => msg.contents.timestamp,
105+
ChannelUpdate::Unsigned(msg) => msg.timestamp,
106+
}
107+
}
108+
}
73109

74110
struct AccessMessages {
75111
complete: Option<Result<TxOut, ChainAccessError>>,
76112
channel_announce: Option<ChannelAnnouncement>,
113+
latest_node_announce_a: Option<NodeAnnouncement>,
114+
latest_node_announce_b: Option<NodeAnnouncement>,
115+
latest_channel_update_a: Option<ChannelUpdate>,
116+
latest_channel_update_b: Option<ChannelUpdate>,
77117
}
78118

79119
/// Represents a future resolution of a [`ChainAccess::get_utxo`] query resolving async.
@@ -99,13 +139,17 @@ impl AccessFuture {
99139
Self { state: Arc::new(Mutex::new(AccessMessages {
100140
complete: None,
101141
channel_announce: None,
142+
latest_node_announce_a: None,
143+
latest_node_announce_b: None,
144+
latest_channel_update_a: None,
145+
latest_channel_update_b: None,
102146
}))}
103147
}
104148

105149
/// Resolves this future against the given `graph` and with the given `result`.
106150
pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, ChainAccessError>)
107151
where L::Target: Logger {
108-
let announcement = {
152+
let (announcement, node_a, node_b, update_a, update_b) = {
109153
let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
110154
let mut async_messages = self.state.lock().unwrap();
111155

@@ -116,14 +160,19 @@ impl AccessFuture {
116160
async_messages.complete = Some(result);
117161
return;
118162
}
163+
119164
let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
120165
ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
121166
ChannelAnnouncement::Unsigned(msg) => &msg,
122167
};
123168

124169
pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
125170

126-
async_messages.channel_announce.take().unwrap()
171+
(async_messages.channel_announce.take().unwrap(),
172+
async_messages.latest_node_announce_a.take(),
173+
async_messages.latest_node_announce_b.take(),
174+
async_messages.latest_channel_update_a.take(),
175+
async_messages.latest_channel_update_b.take())
127176
};
128177

129178
// Now that we've updated our internal state, pass the pending messages back through the
@@ -139,11 +188,36 @@ impl AccessFuture {
139188
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
140189
},
141190
}
191+
192+
for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
193+
match announce {
194+
Some(NodeAnnouncement::Full(signed_msg)) => {
195+
let _ = graph.update_node_from_announcement(&signed_msg);
196+
},
197+
Some(NodeAnnouncement::Unsigned(msg)) => {
198+
let _ = graph.update_node_from_unsigned_announcement(&msg);
199+
},
200+
None => {},
201+
}
202+
}
203+
204+
for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
205+
match update {
206+
Some(ChannelUpdate::Full(signed_msg)) => {
207+
let _ = graph.update_channel(&signed_msg);
208+
},
209+
Some(ChannelUpdate::Unsigned(msg)) => {
210+
let _ = graph.update_channel_unsigned(&msg);
211+
},
212+
None => {},
213+
}
214+
}
142215
}
143216
}
144217

145218
struct PendingChecksContext {
146219
channels: HashMap<u64, Weak<Mutex<AccessMessages>>>,
220+
nodes: HashMap<NodeId, Vec<Weak<Mutex<AccessMessages>>>>,
147221
}
148222

149223
impl PendingChecksContext {
@@ -155,6 +229,17 @@ impl PendingChecksContext {
155229
e.remove();
156230
}
157231
}
232+
233+
let node_id_1 = NodeId::from_pubkey(&msg.node_id_1);
234+
if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(node_id_1) {
235+
e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
236+
if e.get().is_empty() { e.remove(); }
237+
}
238+
let node_id_2 = NodeId::from_pubkey(&msg.node_id_2);
239+
if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(node_id_2) {
240+
e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
241+
if e.get().is_empty() { e.remove(); }
242+
}
158243
}
159244
}
160245

@@ -166,10 +251,98 @@ pub(super) struct PendingChecks {
166251
impl PendingChecks {
167252
pub(super) fn new() -> Self {
168253
PendingChecks { internal: Mutex::new(PendingChecksContext {
169-
channels: HashMap::new(),
254+
channels: HashMap::new(), nodes: HashMap::new(),
170255
}) }
171256
}
172257

258+
/// Checks if there is a pending `channel_update` UTXO validation for the given channel,
259+
/// and, if so, stores the channel message for handling later and returns an `Err`.
260+
pub(super) fn check_hold_pending_channel_update(
261+
&self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
262+
) -> Result<(), LightningError> {
263+
let mut pending_checks = self.internal.lock().unwrap();
264+
if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
265+
let is_from_a = (msg.flags & 1) == 1;
266+
match Weak::upgrade(e.get()) {
267+
Some(msgs_ref) => {
268+
let mut messages = msgs_ref.lock().unwrap();
269+
let latest_update = if is_from_a {
270+
&mut messages.latest_channel_update_a
271+
} else {
272+
&mut messages.latest_channel_update_b
273+
};
274+
if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
275+
// If the messages we got has a higher timestamp, just blindly assume the
276+
// signatures on the new message are correct and drop the old message. This
277+
// may cause us to end up dropping valid `channel_update`s if a peer is
278+
// malicious, but we should get the correct ones when the node updates them.
279+
*latest_update = Some(
280+
if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
281+
else { ChannelUpdate::Unsigned(msg.clone()) });
282+
}
283+
return Err(LightningError {
284+
err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
285+
action: ErrorAction::IgnoreAndLog(Level::Gossip),
286+
});
287+
},
288+
None => { e.remove(); },
289+
}
290+
}
291+
Ok(())
292+
}
293+
294+
/// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
295+
/// given node and, if so, stores the channel message for handling later and returns an `Err`.
296+
pub(super) fn check_hold_pending_node_announcement(
297+
&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
298+
) -> Result<(), LightningError> {
299+
let mut pending_checks = self.internal.lock().unwrap();
300+
if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(NodeId::from_pubkey(&msg.node_id)) {
301+
let mut found_at_least_one_chan = false;
302+
e.get_mut().retain(|node_msgs| {
303+
match Weak::upgrade(&node_msgs) {
304+
Some(chan_mtx) => {
305+
let mut chan_msgs = chan_mtx.lock().unwrap();
306+
if let Some(chan_announce) = &chan_msgs.channel_announce {
307+
let latest_announce =
308+
if *chan_announce.node_id_1() == msg.node_id {
309+
&mut chan_msgs.latest_node_announce_a
310+
} else {
311+
&mut chan_msgs.latest_node_announce_b
312+
};
313+
if latest_announce.is_none() ||
314+
latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
315+
{
316+
// If the messages we got has a higher timestamp, just blindly
317+
// assume the signatures on the new message are correct and drop
318+
// the old message. This may cause us to end up dropping valid
319+
// `node_announcement`s if a peer is malicious, but we should get
320+
// the correct ones when the node updates them.
321+
*latest_announce = Some(
322+
if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
323+
else { NodeAnnouncement::Unsigned(msg.clone()) });
324+
}
325+
found_at_least_one_chan = true;
326+
true
327+
} else {
328+
debug_assert!(false, "channel_announce is set before struct is added to node map");
329+
false
330+
}
331+
},
332+
None => false,
333+
}
334+
});
335+
if e.get().is_empty() { e.remove(); }
336+
if found_at_least_one_chan {
337+
return Err(LightningError {
338+
err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
339+
action: ErrorAction::IgnoreAndLog(Level::Gossip),
340+
});
341+
}
342+
}
343+
Ok(())
344+
}
345+
173346
fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
174347
full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<AccessMessages>>>,
175348
pending_channels: &mut HashMap<u64, Weak<Mutex<AccessMessages>>>
@@ -283,6 +456,10 @@ impl PendingChecks {
283456
async_messages.channel_announce = Some(
284457
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
285458
else { ChannelAnnouncement::Unsigned(msg.clone()) });
459+
pending_checks.nodes.entry(NodeId::from_pubkey(&msg.node_id_1))
460+
.or_insert(Vec::new()).push(Arc::downgrade(&future.state));
461+
pending_checks.nodes.entry(NodeId::from_pubkey(&msg.node_id_2))
462+
.or_insert(Vec::new()).push(Arc::downgrade(&future.state));
286463
Err(LightningError {
287464
err: "Channel being checked async".to_owned(),
288465
action: ErrorAction::IgnoreAndLog(Level::Gossip),

0 commit comments

Comments
 (0)