Skip to content

Commit 39688f5

Browse files
committed
Process channel_update/node_announcement async if needed
If we have a `channel_announcement` which is waiting on a UTXO lookup before we can process it, and we receive a `channel_update` or `node_announcement` for the same channel or a node which is a part of the channel, we have to wait until the lookup completes until we can decide if we want to accept the new message. Here, we store the new message in the pending lookup state and process it asynchronously like the original `channel_announcement`.
1 parent a3fcf9c commit 39688f5

File tree

2 files changed

+180
-3
lines changed

2 files changed

+180
-3
lines changed

lightning/src/routing/gossip.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,8 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
12781278
}
12791279

12801280
fn update_node_from_announcement_intern(&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>) -> Result<(), LightningError> {
1281+
self.pending_checks.check_hold_pending_node_announcement(msg, full_msg)?;
1282+
12811283
match self.nodes.write().unwrap().get_mut(&NodeId::from_pubkey(&msg.node_id)) {
12821284
None => Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}),
12831285
Some(node) => {
@@ -1708,6 +1710,8 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
17081710
}
17091711
}
17101712

1713+
self.pending_checks.check_hold_pending_channel_update(msg, full_msg)?;
1714+
17111715
let mut channels = self.channels.write().unwrap();
17121716
match channels.get_mut(&msg.short_channel_id) {
17131717
None => return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}),

lightning/src/routing/gossip_checking.rs

Lines changed: 176 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
use bitcoin::{BlockHash, TxOut};
88
use bitcoin::hashes::hex::ToHex;
99

10+
use bitcoin::secp256k1::PublicKey;
11+
1012
use crate::ln::chan_utils::make_funding_redeemscript;
1113
use crate::ln::msgs::{self, LightningError, ErrorAction};
1214
use crate::routing::gossip::{NetworkGraph, NodeId};
@@ -57,10 +59,48 @@ enum ChannelAnnouncement {
5759
Full(msgs::ChannelAnnouncement),
5860
Unsigned(msgs::UnsignedChannelAnnouncement),
5961
}
62+
impl ChannelAnnouncement {
63+
fn node_id_1(&self) -> &PublicKey {
64+
match self {
65+
ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
66+
ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
67+
}
68+
}
69+
}
70+
71+
enum NodeAnnouncement {
72+
Full(msgs::NodeAnnouncement),
73+
Unsigned(msgs::UnsignedNodeAnnouncement),
74+
}
75+
impl NodeAnnouncement {
76+
fn timestamp(&self) -> u32 {
77+
match self {
78+
NodeAnnouncement::Full(msg) => msg.contents.timestamp,
79+
NodeAnnouncement::Unsigned(msg) => msg.timestamp,
80+
}
81+
}
82+
}
83+
84+
enum ChannelUpdate {
85+
Full(msgs::ChannelUpdate),
86+
Unsigned(msgs::UnsignedChannelUpdate),
87+
}
88+
impl ChannelUpdate {
89+
fn timestamp(&self) -> u32 {
90+
match self {
91+
ChannelUpdate::Full(msg) => msg.contents.timestamp,
92+
ChannelUpdate::Unsigned(msg) => msg.timestamp,
93+
}
94+
}
95+
}
6096

6197
struct AccessMessages {
6298
complete: Option<Result<TxOut, ChainAccessError>>,
6399
channel_announce: Option<ChannelAnnouncement>,
100+
latest_node_announce_a: Option<NodeAnnouncement>,
101+
latest_node_announce_b: Option<NodeAnnouncement>,
102+
latest_channel_update_a: Option<ChannelUpdate>,
103+
latest_channel_update_b: Option<ChannelUpdate>,
64104
}
65105

66106
#[derive(Clone)]
@@ -83,13 +123,17 @@ impl AccessFuture {
83123
Self { state: Arc::new(Mutex::new(AccessMessages {
84124
complete: None,
85125
channel_announce: None,
126+
latest_node_announce_a: None,
127+
latest_node_announce_b: None,
128+
latest_channel_update_a: None,
129+
latest_channel_update_b: None,
86130
}))}
87131
}
88132

89133
/// Resolves this future against the given `graph` and with the given `result`.
90134
pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, ChainAccessError>)
91135
where L::Target: Logger {
92-
let announcement = {
136+
let (announcement, node_a, node_b, update_a, update_b) = {
93137
let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
94138
let mut async_messages = self.state.lock().unwrap();
95139

@@ -100,6 +144,7 @@ impl AccessFuture {
100144
async_messages.complete = Some(result);
101145
return;
102146
}
147+
103148
let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
104149
ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
105150
ChannelAnnouncement::Unsigned(msg) => &msg,
@@ -114,7 +159,24 @@ impl AccessFuture {
114159
}
115160
}
116161

117-
async_messages.channel_announce.take().unwrap()
162+
if let hash_map::Entry::Occupied(mut e) =
163+
pending_checks.nodes.entry(NodeId::from_pubkey(&announcement_msg.node_id_1))
164+
{
165+
e.get_mut().retain(|elem| !core::ptr::eq(Weak::as_ptr(&elem), Arc::as_ptr(&self.state)));
166+
if e.get().is_empty() { e.remove(); }
167+
}
168+
if let hash_map::Entry::Occupied(mut e) =
169+
pending_checks.nodes.entry(NodeId::from_pubkey(&announcement_msg.node_id_2))
170+
{
171+
e.get_mut().retain(|elem| !core::ptr::eq(Weak::as_ptr(&elem), Arc::as_ptr(&self.state)));
172+
if e.get().is_empty() { e.remove(); }
173+
}
174+
175+
(async_messages.channel_announce.take().unwrap(),
176+
async_messages.latest_node_announce_a.take(),
177+
async_messages.latest_node_announce_b.take(),
178+
async_messages.latest_channel_update_a.take(),
179+
async_messages.latest_channel_update_b.take())
118180
};
119181

120182
// Now that we've updated our internal state, pass the pending messages back through the
@@ -130,11 +192,36 @@ impl AccessFuture {
130192
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
131193
},
132194
}
195+
196+
for announce in [node_a, node_b] {
197+
match announce {
198+
Some(NodeAnnouncement::Full(signed_msg)) => {
199+
let _ = graph.update_node_from_announcement(&signed_msg);
200+
},
201+
Some(NodeAnnouncement::Unsigned(msg)) => {
202+
let _ = graph.update_node_from_unsigned_announcement(&msg);
203+
},
204+
None => {},
205+
}
206+
}
207+
208+
for update in [update_a, update_b] {
209+
match update {
210+
Some(ChannelUpdate::Full(signed_msg)) => {
211+
let _ = graph.update_channel(&signed_msg);
212+
},
213+
Some(ChannelUpdate::Unsigned(msg)) => {
214+
let _ = graph.update_channel_unsigned(&msg);
215+
},
216+
None => {},
217+
}
218+
}
133219
}
134220
}
135221

136222
struct PendingChecksContext {
137223
channels: HashMap<u64, Weak<Mutex<AccessMessages>>>,
224+
nodes: HashMap<NodeId, Vec<Weak<Mutex<AccessMessages>>>>,
138225
}
139226

140227
/// A set of messages which are pending UTXO lookups for processing.
@@ -145,10 +232,92 @@ pub(super) struct PendingChecks {
145232
impl PendingChecks {
146233
pub(super) fn new() -> Self {
147234
PendingChecks { internal: Mutex::new(PendingChecksContext {
148-
channels: HashMap::new(),
235+
channels: HashMap::new(), nodes: HashMap::new(),
149236
}) }
150237
}
151238

239+
/// Checks if there is a pending `node_announcement` UTXO validation for the given channel,
240+
/// and, if so, stores the channel message for handling later and returns an `Err`.
241+
pub(super) fn check_hold_pending_channel_update(
242+
&self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
243+
) -> Result<(), LightningError> {
244+
let mut pending_checks = self.internal.lock().unwrap();
245+
if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
246+
let is_from_a = (msg.flags & 1) == 1;
247+
match Weak::upgrade(e.get()) {
248+
Some(msgs_ref) => {
249+
let mut messages = msgs_ref.lock().unwrap();
250+
let latest_update = if is_from_a {
251+
&mut messages.latest_channel_update_a
252+
} else {
253+
&mut messages.latest_channel_update_b
254+
};
255+
if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
256+
// If the messages we got has a higher timestamp, just blindly assume the
257+
// signatures on the new message are correct and drop the old message. This
258+
// may cause us to end up dropping valid `channel_update`s if a peer is
259+
// malicious, but we should get the correct ones when the node updates them.
260+
*latest_update = Some(
261+
if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
262+
else { ChannelUpdate::Unsigned(msg.clone()) });
263+
}
264+
return Err(LightningError {
265+
err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
266+
action: ErrorAction::IgnoreError,
267+
});
268+
},
269+
None => { e.remove(); },
270+
}
271+
}
272+
Ok(())
273+
}
274+
275+
/// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
276+
/// given node and, if so, stores the channel message for handling later and returns an `Err`.
277+
pub(super) fn check_hold_pending_node_announcement(
278+
&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
279+
) -> Result<(), LightningError> {
280+
let mut pending_checks = self.internal.lock().unwrap();
281+
if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(NodeId::from_pubkey(&msg.node_id)) {
282+
let mut found_at_least_one_chan = false;
283+
e.get_mut().retain(|node_msgs| {
284+
match Weak::upgrade(&node_msgs) {
285+
Some(chan_mtx) => {
286+
let mut chan_msgs = chan_mtx.lock().unwrap();
287+
if let Some(chan_announce) = &chan_msgs.channel_announce {
288+
let latest_announce = if *chan_announce.node_id_1() == msg.node_id {
289+
&mut chan_msgs.latest_node_announce_a
290+
} else {
291+
&mut chan_msgs.latest_node_announce_b
292+
};
293+
if latest_announce.is_none() ||
294+
latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
295+
{
296+
*latest_announce = Some(
297+
if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
298+
else { NodeAnnouncement::Unsigned(msg.clone()) });
299+
}
300+
found_at_least_one_chan = true;
301+
true
302+
} else {
303+
debug_assert!(false, "channel_announce is set before struct is added to node map");
304+
false
305+
}
306+
},
307+
None => false,
308+
}
309+
});
310+
if e.get().is_empty() { e.remove(); }
311+
if found_at_least_one_chan {
312+
return Err(LightningError {
313+
err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
314+
action: ErrorAction::IgnoreError,
315+
});
316+
}
317+
}
318+
Ok(())
319+
}
320+
152321
pub(super) fn check_channel_announcement<A: Deref>(&self,
153322
chain_access: &Option<A>, msg: &msgs::UnsignedChannelAnnouncement,
154323
full_msg: Option<&msgs::ChannelAnnouncement>
@@ -257,6 +426,10 @@ impl PendingChecks {
257426
async_messages.channel_announce = Some(
258427
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
259428
else { ChannelAnnouncement::Unsigned(msg.clone()) });
429+
pending_checks.nodes.entry(NodeId::from_pubkey(&msg.node_id_1))
430+
.or_insert(Vec::new()).push(Arc::downgrade(&future.state));
431+
pending_checks.nodes.entry(NodeId::from_pubkey(&msg.node_id_2))
432+
.or_insert(Vec::new()).push(Arc::downgrade(&future.state));
260433
Err(LightningError {
261434
err: "Channel being checked async".to_owned(),
262435
action: ErrorAction::IgnoreError

0 commit comments

Comments
 (0)