Skip to content

Commit 7b68853

Browse files
committed
Add an async resolution option to ChainAccess::get_utxo
For those operating in an async environment, requiring `ChainAccess::get_utxo` return information about the requested UTXO synchronously is incredibly painful. Requesting information about a random UTXO is likely to go over the network, and likely to be a rather slow request. Thus, here, we change the return type of `get_utxo` to have both a synchronous and asynchronous form. The asynchronous form requires the user construct a `AccessFuture` which they `clone` and pass back to us. Internally, an `AccessFuture` has an `Arc` to the `channel_announcement` message which we need to process. When the user completes their lookup, they call `resolve` on their `AccessFuture` which we pull the `channel_announcement` from and then apply to the network graph.
1 parent fa99941 commit 7b68853

File tree

4 files changed

+181
-28
lines changed

4 files changed

+181
-28
lines changed

fuzz/src/router.rs

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use lightning::chain::transaction::OutPoint;
1515
use lightning::ln::channelmanager::{self, ChannelDetails, ChannelCounterparty};
1616
use lightning::ln::msgs;
1717
use lightning::routing::gossip::{NetworkGraph, RoutingFees};
18-
use lightning::routing::gossip_checking::{ChainAccess, ChainAccessError};
18+
use lightning::routing::gossip_checking::{AccessFuture, ChainAccess, ChainAccessError, ChainAccessResult};
1919
use lightning::routing::router::{find_route, PaymentParameters, RouteHint, RouteHintHop, RouteParameters};
2020
use lightning::routing::scoring::FixedPenaltyScorer;
2121
use lightning::util::config::UserConfig;
@@ -81,17 +81,36 @@ impl InputData {
8181
}
8282
}
8383

84-
struct FuzzChainSource {
84+
struct FuzzChainSource<'a, 'b, Out: test_logger::Output> {
8585
input: Arc<InputData>,
86+
net_graph: &'a NetworkGraph<&'b test_logger::TestLogger<Out>>,
8687
}
87-
impl ChainAccess for FuzzChainSource {
88-
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, ChainAccessError> {
89-
match self.input.get_slice(2) {
90-
Some(&[0, _]) => Err(ChainAccessError::UnknownChain),
91-
Some(&[1, _]) => Err(ChainAccessError::UnknownTx),
92-
Some(&[_, x]) => Ok(TxOut { value: 0, script_pubkey: Builder::new().push_int(x as i64).into_script().to_v0_p2wsh() }),
93-
None => Err(ChainAccessError::UnknownTx),
94-
_ => unreachable!(),
88+
impl<Out: test_logger::Output> ChainAccess for FuzzChainSource<'_, '_, Out> {
89+
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> ChainAccessResult {
90+
let input_slice = self.input.get_slice(2);
91+
if input_slice.is_none() { return ChainAccessResult::Sync(Err(ChainAccessError::UnknownTx)); }
92+
let input_slice = input_slice.unwrap();
93+
let txo_res = TxOut {
94+
value: if input_slice[0] % 2 == 0 { 1_000_000 } else { 1_000 },
95+
script_pubkey: Builder::new().push_int(input_slice[1] as i64).into_script().to_v0_p2wsh(),
96+
};
97+
match input_slice {
98+
&[0, _] => ChainAccessResult::Sync(Err(ChainAccessError::UnknownChain)),
99+
&[1, _] => ChainAccessResult::Sync(Err(ChainAccessError::UnknownTx)),
100+
&[2, _] => {
101+
let future = AccessFuture::new();
102+
future.resolve(self.net_graph, Ok(txo_res));
103+
ChainAccessResult::Async(future.clone())
104+
},
105+
&[3, _] => {
106+
let future = AccessFuture::new();
107+
future.resolve(self.net_graph, Err(ChainAccessError::UnknownTx));
108+
ChainAccessResult::Async(future.clone())
109+
},
110+
&[4, _] => {
111+
ChainAccessResult::Async(AccessFuture::new()) // the future will never resolve
112+
},
113+
&[..] => ChainAccessResult::Sync(Ok(txo_res)),
95114
}
96115
}
97116
}
@@ -162,6 +181,10 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
162181

163182
let our_pubkey = get_pubkey!();
164183
let net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), &logger);
184+
let chain_source = FuzzChainSource {
185+
input: Arc::clone(&input),
186+
net_graph: &net_graph,
187+
};
165188

166189
let mut node_pks = HashSet::new();
167190
let mut scid = 42;
@@ -182,13 +205,14 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
182205
let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
183206
node_pks.insert(msg.node_id_1);
184207
node_pks.insert(msg.node_id_2);
185-
let _ = net_graph.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None);
208+
let _ = net_graph.update_channel_from_unsigned_announcement::
209+
<&FuzzChainSource<'_, '_, Out>>(&msg, &None);
186210
},
187211
2 => {
188212
let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
189213
node_pks.insert(msg.node_id_1);
190214
node_pks.insert(msg.node_id_2);
191-
let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&FuzzChainSource { input: Arc::clone(&input) }));
215+
let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&chain_source));
192216
},
193217
3 => {
194218
let _ = net_graph.update_channel_unsigned(&decode_msg!(msgs::UnsignedChannelUpdate, 72));

lightning/src/routing/gossip.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ pub struct NetworkGraph<L: Deref> where L::Target: Logger {
150150
/// resync them from gossip. Each `NodeId` is mapped to the time (in seconds) it was removed so
151151
/// that once some time passes, we can potentially resync it from gossip again.
152152
removed_nodes: Mutex<HashMap<NodeId, Option<u64>>>,
153+
/// Announcement messages which are awaiting an on-chain lookup to be processed.
154+
pub(super) pending_checks: gossip_checking::PendingChecks,
153155
}
154156

155157
/// A read-only view of [`NetworkGraph`].
@@ -1180,6 +1182,7 @@ impl<L: Deref> ReadableArgs<L> for NetworkGraph<L> where L::Target: Logger {
11801182
last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp),
11811183
removed_nodes: Mutex::new(HashMap::new()),
11821184
removed_channels: Mutex::new(HashMap::new()),
1185+
pending_checks: gossip_checking::PendingChecks::new(),
11831186
})
11841187
}
11851188
}
@@ -1219,6 +1222,7 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
12191222
last_rapid_gossip_sync_timestamp: Mutex::new(None),
12201223
removed_channels: Mutex::new(HashMap::new()),
12211224
removed_nodes: Mutex::new(HashMap::new()),
1225+
pending_checks: gossip_checking::PendingChecks::new(),
12221226
}
12231227
}
12241228

@@ -1477,7 +1481,8 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
14771481
}
14781482
}
14791483

1480-
let utxo_value = gossip_checking::check_channel_announcement(chain_access, msg)?;
1484+
let utxo_value = self.pending_checks.check_channel_announcement(
1485+
chain_access, msg, full_msg)?;
14811486

14821487
#[allow(unused_mut, unused_assignments)]
14831488
let mut announcement_received_time = 0;

lightning/src/routing/gossip_checking.rs

Lines changed: 135 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ use bitcoin::hashes::hex::ToHex;
1818

1919
use crate::ln::chan_utils::make_funding_redeemscript;
2020
use crate::ln::msgs::{self, LightningError, ErrorAction};
21+
use crate::routing::gossip::{NetworkGraph, NodeId};
22+
use crate::util::logger::{Level, Logger};
2123
use crate::util::ser::Writeable;
2224

2325
use crate::prelude::*;
2426

27+
use alloc::sync::{Arc, Weak};
28+
use crate::sync::Mutex;
2529
use core::ops::Deref;
2630

2731
/// An error when accessing the chain via [`ChainAccess`].
@@ -34,26 +38,118 @@ pub enum ChainAccessError {
3438
UnknownTx,
3539
}
3640

41+
/// The result of a [`ChainAccess::get_utxo`] call. A call may resolve either synchronously,
42+
/// returning the `Sync` variant, or asynchronously, returning an [`AccessFuture`] in the `Async`
43+
/// variant.
44+
pub enum ChainAccessResult {
45+
/// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
46+
/// requested or a [`ChainAccessError`].
47+
Sync(Result<TxOut, ChainAccessError>),
48+
/// A result which will be resolved asynchronously. It includes an [`AccessFuture`], a `clone`
49+
/// of which you must keep locally and call [`AccessFuture::resolve`] on once the lookup
50+
/// completes.
51+
///
52+
/// Note that in order to avoid runaway memory usage, the number of parallel checks is limited,
53+
/// but only fairly loosely. Because a pending checks block all message processing, leaving
54+
/// checks pending for an extended time may cause DoS of other functions. It is recommended you
55+
/// keep a tight timeout on lookups, on the order of a few seconds.
56+
Async(AccessFuture),
57+
}
58+
3759
/// The `ChainAccess` trait defines behavior for accessing on-chain UTXOs.
3860
pub trait ChainAccess {
3961
/// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
4062
/// Returns an error if `genesis_hash` is for a different chain or if such a transaction output
4163
/// is unknown.
4264
///
4365
/// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
44-
fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, ChainAccessError>;
66+
fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> ChainAccessResult;
67+
}
68+
69+
enum ChannelAnnouncement {
70+
Full(msgs::ChannelAnnouncement),
71+
Unsigned(msgs::UnsignedChannelAnnouncement),
72+
}
73+
74+
struct AccessMessages {
75+
complete: Option<Result<TxOut, ChainAccessError>>,
76+
channel_announce: Option<ChannelAnnouncement>,
77+
}
78+
79+
/// Represents a future resolution of a [`ChainAccess::get_utxo`] query resolving async.
80+
///
81+
/// See [`ChainAccessResult::Async`] and [`AccessFuture::resolve`] for more info.
82+
#[derive(Clone)]
83+
pub struct AccessFuture {
84+
state: Arc<Mutex<AccessMessages>>,
85+
}
86+
87+
/// A trivial implementation of [`ChainAccess`] which is used to call back into the network graph
88+
/// once we have a concrete resolution of a request.
89+
struct AccessResolver(Result<TxOut, ChainAccessError>);
90+
impl ChainAccess for AccessResolver {
91+
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> ChainAccessResult {
92+
ChainAccessResult::Sync(self.0.clone())
93+
}
94+
}
95+
96+
impl AccessFuture {
97+
/// Builds a new future for later resolution.
98+
pub fn new() -> Self {
99+
Self { state: Arc::new(Mutex::new(AccessMessages {
100+
complete: None,
101+
channel_announce: None,
102+
}))}
103+
}
104+
105+
/// Resolves this future against the given `graph` and with the given `result`.
106+
pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, ChainAccessError>)
107+
where L::Target: Logger {
108+
let announcement = {
109+
let mut async_messages = self.state.lock().unwrap();
110+
111+
if async_messages.channel_announce.is_none() {
112+
// We raced returning to `check_channel_announcement` which hasn't updated
113+
// `channel_announce` yet. That's okay, we can set the `complete` field which it will
114+
// check once it gets control again.
115+
async_messages.complete = Some(result);
116+
return;
117+
}
118+
119+
async_messages.channel_announce.take().unwrap()
120+
};
121+
122+
// Now that we've updated our internal state, pass the pending messages back through the
123+
// network graph with a different `ChainAccess` which will resolve immediately.
124+
// Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
125+
// with them.
126+
let resolver = AccessResolver(result);
127+
match announcement {
128+
ChannelAnnouncement::Full(signed_msg) => {
129+
let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver));
130+
},
131+
ChannelAnnouncement::Unsigned(msg) => {
132+
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
133+
},
134+
}
135+
}
136+
}
137+
138+
/// A set of messages which are pending UTXO lookups for processing.
139+
pub(super) struct PendingChecks {
45140
}
46141

47-
pub(crate) fn check_channel_announcement<A: Deref>(
48-
chain_access: &Option<A>, msg: &msgs::UnsignedChannelAnnouncement
49-
) -> Result<Option<u64>, msgs::LightningError> where A::Target: ChainAccess {
50-
match chain_access {
51-
&None => {
52-
// Tentatively accept, potentially exposing us to DoS attacks
53-
Ok(None)
54-
},
55-
&Some(ref chain_access) => {
56-
match chain_access.get_utxo(&msg.chain_hash, msg.short_channel_id) {
142+
impl PendingChecks {
143+
pub(super) fn new() -> Self {
144+
PendingChecks {}
145+
}
146+
147+
pub(super) fn check_channel_announcement<A: Deref>(&self,
148+
chain_access: &Option<A>, msg: &msgs::UnsignedChannelAnnouncement,
149+
full_msg: Option<&msgs::ChannelAnnouncement>
150+
) -> Result<Option<u64>, msgs::LightningError> where A::Target: ChainAccess {
151+
let handle_result = |res| {
152+
match res {
57153
Ok(TxOut { value, script_pubkey }) => {
58154
let expected_script =
59155
make_funding_redeemscript(&msg.bitcoin_key_1, &msg.bitcoin_key_2).to_v0_p2wsh();
@@ -80,6 +176,34 @@ pub(crate) fn check_channel_announcement<A: Deref>(
80176
})
81177
},
82178
}
179+
};
180+
181+
match chain_access {
182+
&None => {
183+
// Tentatively accept, potentially exposing us to DoS attacks
184+
Ok(None)
185+
},
186+
&Some(ref chain_access) => {
187+
match chain_access.get_utxo(&msg.chain_hash, msg.short_channel_id) {
188+
ChainAccessResult::Sync(res) => handle_result(res),
189+
ChainAccessResult::Async(future) => {
190+
let mut async_messages = future.state.lock().unwrap();
191+
if let Some(res) = async_messages.complete.take() {
192+
// In the unlikely event the future resolved before we managed to get it,
193+
// handle the result in-line.
194+
handle_result(res)
195+
} else {
196+
async_messages.channel_announce = Some(
197+
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
198+
else { ChannelAnnouncement::Unsigned(msg.clone()) });
199+
Err(LightningError {
200+
err: "Channel being checked async".to_owned(),
201+
action: ErrorAction::IgnoreAndLog(Level::Gossip),
202+
})
203+
}
204+
},
205+
}
206+
}
83207
}
84208
}
85209
}

lightning/src/util/test_utils.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::ln::{msgs, wire};
2323
use crate::ln::msgs::LightningError;
2424
use crate::ln::script::ShutdownScript;
2525
use crate::routing::gossip::NetworkGraph;
26-
use crate::routing::gossip_checking::{ChainAccess, ChainAccessError};
26+
use crate::routing::gossip_checking::{ChainAccess, ChainAccessError, ChainAccessResult};
2727
use crate::routing::router::{find_route, InFlightHtlcs, Route, RouteHop, RouteParameters, Router, ScorerAccountingForInFlightHtlcs};
2828
use crate::routing::scoring::FixedPenaltyScorer;
2929
use crate::util::config::UserConfig;
@@ -856,12 +856,12 @@ impl TestChainSource {
856856
}
857857

858858
impl ChainAccess for TestChainSource {
859-
fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, ChainAccessError> {
859+
fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> ChainAccessResult {
860860
if self.genesis_hash != *genesis_hash {
861-
return Err(ChainAccessError::UnknownChain);
861+
return ChainAccessResult::Sync(Err(ChainAccessError::UnknownChain));
862862
}
863863

864-
self.utxo_ret.lock().unwrap().clone()
864+
ChainAccessResult::Sync(self.utxo_ret.lock().unwrap().clone())
865865
}
866866
}
867867

0 commit comments

Comments
 (0)