Skip to content

Commit dc83407

Browse files
committed
Add an async resolution option to ChainAccess::get_utxo
For those operating in an async environment, requiring `ChainAccess::get_utxo` return information about the requested UTXO synchronously is incredibly painful. Requesting information about a random UTXO is likely to go over the network, and likely to be a rather slow request. Thus, here, we change the return type of `get_utxo` to have both a synchronous and asynchronous form. The asynchronous form requires the user construct a `AccessFuture` which they `clone` and pass back to us. Internally, an `AccessFuture` has an `Arc` to the `channel_announcement` message which we need to process. When the user completes their lookup, they call `resolve` on their `AccessFuture` which we pull the `channel_announcement` from and then apply to the network graph.
1 parent cb83937 commit dc83407

File tree

4 files changed

+175
-28
lines changed

4 files changed

+175
-28
lines changed

fuzz/src/router.rs

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use lightning::chain::transaction::OutPoint;
1515
use lightning::ln::channelmanager::{self, ChannelDetails, ChannelCounterparty};
1616
use lightning::ln::msgs;
1717
use lightning::routing::gossip::{NetworkGraph, RoutingFees};
18-
use lightning::routing::gossip_checking::{ChainAccess, ChainAccessError};
18+
use lightning::routing::gossip_checking::{AccessFuture, ChainAccess, ChainAccessError, ChainAccessResult};
1919
use lightning::routing::router::{find_route, PaymentParameters, RouteHint, RouteHintHop, RouteParameters};
2020
use lightning::routing::scoring::FixedPenaltyScorer;
2121
use lightning::util::config::UserConfig;
@@ -81,17 +81,36 @@ impl InputData {
8181
}
8282
}
8383

84-
struct FuzzChainSource {
84+
struct FuzzChainSource<'a, 'b, Out: test_logger::Output> {
8585
input: Arc<InputData>,
86+
net_graph: &'a NetworkGraph<&'b test_logger::TestLogger<Out>>,
8687
}
87-
impl ChainAccess for FuzzChainSource {
88-
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, ChainAccessError> {
89-
match self.input.get_slice(2) {
90-
Some(&[0, _]) => Err(ChainAccessError::UnknownChain),
91-
Some(&[1, _]) => Err(ChainAccessError::UnknownTx),
92-
Some(&[_, x]) => Ok(TxOut { value: 0, script_pubkey: Builder::new().push_int(x as i64).into_script().to_v0_p2wsh() }),
93-
None => Err(ChainAccessError::UnknownTx),
94-
_ => unreachable!(),
88+
impl<Out: test_logger::Output> ChainAccess for FuzzChainSource<'_, '_, Out> {
89+
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> ChainAccessResult {
90+
let input_slice = self.input.get_slice(2);
91+
if input_slice.is_none() { return ChainAccessResult::Sync(Err(ChainAccessError::UnknownTx)); }
92+
let input_slice = input_slice.unwrap();
93+
let txo_res = TxOut {
94+
value: if input_slice[0] % 2 == 0 { 1_000_000 } else { 1_000 },
95+
script_pubkey: Builder::new().push_int(input_slice[1] as i64).into_script().to_v0_p2wsh(),
96+
};
97+
match input_slice {
98+
&[0, _] => ChainAccessResult::Sync(Err(ChainAccessError::UnknownChain)),
99+
&[1, _] => ChainAccessResult::Sync(Err(ChainAccessError::UnknownTx)),
100+
&[2, _] => {
101+
let future = AccessFuture::new();
102+
future.resolve(self.net_graph, Err(ChainAccessError::UnknownTx));
103+
ChainAccessResult::Async(future.clone())
104+
},
105+
&[3, _] => {
106+
let future = AccessFuture::new();
107+
future.resolve(self.net_graph, Err(ChainAccessError::UnknownTx));
108+
ChainAccessResult::Async(future.clone())
109+
},
110+
&[4, _] => {
111+
ChainAccessResult::Async(AccessFuture::new()) // the future will never resolve
112+
},
113+
&[..] => ChainAccessResult::Sync(Ok(txo_res)),
95114
}
96115
}
97116
}
@@ -162,6 +181,10 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
162181

163182
let our_pubkey = get_pubkey!();
164183
let net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), &logger);
184+
let chain_source = FuzzChainSource {
185+
input: Arc::clone(&input),
186+
net_graph: &net_graph,
187+
};
165188

166189
let mut node_pks = HashSet::new();
167190
let mut scid = 42;
@@ -182,13 +205,14 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
182205
let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
183206
node_pks.insert(msg.node_id_1);
184207
node_pks.insert(msg.node_id_2);
185-
let _ = net_graph.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None);
208+
let _ = net_graph.update_channel_from_unsigned_announcement::
209+
<&FuzzChainSource<'_, '_, Out>>(&msg, &None);
186210
},
187211
2 => {
188212
let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
189213
node_pks.insert(msg.node_id_1);
190214
node_pks.insert(msg.node_id_2);
191-
let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&FuzzChainSource { input: Arc::clone(&input) }));
215+
let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&chain_source));
192216
},
193217
3 => {
194218
let _ = net_graph.update_channel_unsigned(&decode_msg!(msgs::UnsignedChannelUpdate, 72));

lightning/src/routing/gossip.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ pub struct NetworkGraph<L: Deref> where L::Target: Logger {
150150
/// resync them from gossip. Each `NodeId` is mapped to the time (in seconds) it was removed so
151151
/// that once some time passes, we can potentially resync it from gossip again.
152152
removed_nodes: Mutex<HashMap<NodeId, Option<u64>>>,
153+
/// Announcement messages which are awaiting an on-chain lookup to be processed.
154+
pub(super) pending_checks: gossip_checking::PendingChecks,
153155
}
154156

155157
/// A read-only view of [`NetworkGraph`].
@@ -1180,6 +1182,7 @@ impl<L: Deref> ReadableArgs<L> for NetworkGraph<L> where L::Target: Logger {
11801182
last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp),
11811183
removed_nodes: Mutex::new(HashMap::new()),
11821184
removed_channels: Mutex::new(HashMap::new()),
1185+
pending_checks: gossip_checking::PendingChecks::new(),
11831186
})
11841187
}
11851188
}
@@ -1219,6 +1222,7 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
12191222
last_rapid_gossip_sync_timestamp: Mutex::new(None),
12201223
removed_channels: Mutex::new(HashMap::new()),
12211224
removed_nodes: Mutex::new(HashMap::new()),
1225+
pending_checks: gossip_checking::PendingChecks::new(),
12221226
}
12231227
}
12241228

@@ -1478,7 +1482,8 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
14781482
}
14791483
}
14801484

1481-
let utxo_value = gossip_checking::check_channel_announcement(chain_access, msg)?;
1485+
let utxo_value = self.pending_checks.check_channel_announcement(
1486+
chain_access, msg, full_msg)?;
14821487

14831488
#[allow(unused_mut, unused_assignments)]
14841489
let mut announcement_received_time = 0;

lightning/src/routing/gossip_checking.rs

Lines changed: 129 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,14 @@ use bitcoin::hashes::hex::ToHex;
99

1010
use crate::ln::chan_utils::make_funding_redeemscript;
1111
use crate::ln::msgs::{self, LightningError, ErrorAction};
12+
use crate::routing::gossip::{NetworkGraph, NodeId};
13+
use crate::util::logger::Logger;
1214
use crate::util::ser::Writeable;
1315

16+
use crate::prelude::*;
17+
18+
use alloc::sync::{Arc, Weak};
19+
use crate::sync::Mutex;
1420
use core::ops::Deref;
1521

1622
/// An error when accessing the chain via [`ChainAccess`].
@@ -23,6 +29,19 @@ pub enum ChainAccessError {
2329
UnknownTx,
2430
}
2531

32+
/// The result of a [`ChainAccess::get_utxo`] call. A call may resolve either synchronously,
33+
/// returning the `Sync` variant, or asynchronously, returning an [`AccessFuture`] in the `Async`
34+
/// variant.
35+
pub enum ChainAccessResult {
36+
/// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
37+
/// requested or a [`ChainAccessError`].
38+
Sync(Result<TxOut, ChainAccessError>),
39+
/// A result which will be resolved asynchronously. It includes an [`AccessFuture`], a `clone`
40+
/// of which you must keep locally and call [`AccessFuture::resolve`] on once the lookup
41+
/// completes.
42+
Async(AccessFuture),
43+
}
44+
2645
/// The `ChainAccess` trait defines behavior for accessing chain data and state, such as blocks and
2746
/// UTXOs.
2847
pub trait ChainAccess {
@@ -31,19 +50,90 @@ pub trait ChainAccess {
3150
/// is unknown.
3251
///
3352
/// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
34-
fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, ChainAccessError>;
53+
fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> ChainAccessResult;
54+
}
55+
56+
enum ChannelAnnouncement {
57+
Full(msgs::ChannelAnnouncement),
58+
Unsigned(msgs::UnsignedChannelAnnouncement),
59+
}
60+
61+
struct AccessMessages {
62+
complete: Option<Result<TxOut, ChainAccessError>>,
63+
channel_announce: Option<ChannelAnnouncement>,
64+
}
65+
66+
#[derive(Clone)]
67+
pub struct AccessFuture {
68+
state: Arc<Mutex<AccessMessages>>,
69+
}
70+
71+
/// A trivial implementation of [`ChainAccess`] which is used to call back into the network graph
72+
/// once we have a concrete resolution of a request.
73+
struct AccessResolver(Result<TxOut, ChainAccessError>);
74+
impl ChainAccess for AccessResolver {
75+
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> ChainAccessResult {
76+
ChainAccessResult::Sync(self.0.clone())
77+
}
78+
}
79+
80+
impl AccessFuture {
81+
/// Builds a new, empty, future for later resolution.
82+
pub fn new() -> Self {
83+
Self { state: Arc::new(Mutex::new(AccessMessages {
84+
complete: None,
85+
channel_announce: None,
86+
}))}
87+
}
88+
89+
/// Resolves this future against the given `graph` and with the given `result`.
90+
pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, ChainAccessError>)
91+
where L::Target: Logger {
92+
let announcement = {
93+
let mut async_messages = self.state.lock().unwrap();
94+
95+
if async_messages.channel_announce.is_none() {
96+
// We raced returning to `check_channel_announcement` which hasn't updated
97+
// `channel_announce` yet. That's okay, we can set the `complete` field which it will
98+
// check once it gets control again.
99+
async_messages.complete = Some(result);
100+
return;
101+
}
102+
103+
async_messages.channel_announce.take().unwrap()
104+
};
105+
106+
// Now that we've updated our internal state, pass the pending messages back through the
107+
// network graph with a different `ChainAccess` which will resolve immediately.
108+
// Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
109+
// with them.
110+
let resolver = AccessResolver(result);
111+
match announcement {
112+
ChannelAnnouncement::Full(signed_msg) => {
113+
let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver));
114+
},
115+
ChannelAnnouncement::Unsigned(msg) => {
116+
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
117+
},
118+
}
119+
}
120+
}
121+
122+
/// A set of messages which are pending UTXO lookups for processing.
123+
pub(super) struct PendingChecks {
35124
}
36125

37-
pub(crate) fn check_channel_announcement<A: Deref>(
38-
chain_access: &Option<A>, msg: &msgs::UnsignedChannelAnnouncement
39-
) -> Result<Option<u64>, msgs::LightningError> where A::Target: ChainAccess {
40-
match chain_access {
41-
&None => {
42-
// Tentatively accept, potentially exposing us to DoS attacks
43-
Ok(None)
44-
},
45-
&Some(ref chain_access) => {
46-
match chain_access.get_utxo(&msg.chain_hash, msg.short_channel_id) {
126+
impl PendingChecks {
127+
pub(super) fn new() -> Self {
128+
PendingChecks {}
129+
}
130+
131+
pub(super) fn check_channel_announcement<A: Deref>(&self,
132+
chain_access: &Option<A>, msg: &msgs::UnsignedChannelAnnouncement,
133+
full_msg: Option<&msgs::ChannelAnnouncement>
134+
) -> Result<Option<u64>, msgs::LightningError> where A::Target: ChainAccess {
135+
let handle_result = |res| {
136+
match res {
47137
Ok(TxOut { value, script_pubkey }) => {
48138
let expected_script =
49139
make_funding_redeemscript(&msg.bitcoin_key_1, &msg.bitcoin_key_2).to_v0_p2wsh();
@@ -70,6 +160,34 @@ pub(crate) fn check_channel_announcement<A: Deref>(
70160
})
71161
},
72162
}
163+
};
164+
165+
match chain_access {
166+
&None => {
167+
// Tentatively accept, potentially exposing us to DoS attacks
168+
Ok(None)
169+
},
170+
&Some(ref chain_access) => {
171+
match chain_access.get_utxo(&msg.chain_hash, msg.short_channel_id) {
172+
ChainAccessResult::Sync(res) => handle_result(res),
173+
ChainAccessResult::Async(future) => {
174+
let mut async_messages = future.state.lock().unwrap();
175+
if let Some(res) = async_messages.complete.take() {
176+
// In the unlikely event the future resolved before we managed to get it,
177+
// handle the result in-line.
178+
handle_result(res)
179+
} else {
180+
async_messages.channel_announce = Some(
181+
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
182+
else { ChannelAnnouncement::Unsigned(msg.clone()) });
183+
Err(LightningError {
184+
err: "Channel being checked async".to_owned(),
185+
action: ErrorAction::IgnoreError
186+
})
187+
}
188+
},
189+
}
190+
}
73191
}
74192
}
75193
}

lightning/src/util/test_utils.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
2222
use crate::ln::{msgs, wire};
2323
use crate::ln::script::ShutdownScript;
2424
use crate::routing::gossip::NetworkGraph;
25-
use crate::routing::gossip_checking::{ChainAccess, ChainAccessError};
25+
use crate::routing::gossip_checking::{ChainAccess, ChainAccessError, ChainAccessResult};
2626
use crate::routing::router::{find_route, InFlightHtlcs, Route, RouteHop, RouteParameters, Router, ScorerAccountingForInFlightHtlcs};
2727
use crate::routing::scoring::FixedPenaltyScorer;
2828
use crate::util::config::UserConfig;
@@ -836,12 +836,12 @@ impl TestChainSource {
836836
}
837837

838838
impl ChainAccess for TestChainSource {
839-
fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, ChainAccessError> {
839+
fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> ChainAccessResult {
840840
if self.genesis_hash != *genesis_hash {
841-
return Err(ChainAccessError::UnknownChain);
841+
return ChainAccessResult::Sync(Err(ChainAccessError::UnknownChain));
842842
}
843843

844-
self.utxo_ret.lock().unwrap().clone()
844+
ChainAccessResult::Sync(self.utxo_ret.lock().unwrap().clone())
845845
}
846846
}
847847

0 commit comments

Comments
 (0)