Skip to content

Commit 993a812

Browse files
Add empty OnionMessenger, OnionMessageHandler trait, and boilerplate
This fills in the boilerplate needed to hook up the OnionMessenger to send and receive messages through the PeerManager. It also sets up the OnionMessenger and its struct fields.
1 parent 79d6642 commit 993a812

File tree

8 files changed

+191
-35
lines changed

8 files changed

+191
-35
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
1515
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
1616
use lightning::chain::keysinterface::{Sign, KeysInterface};
1717
use lightning::ln::channelmanager::ChannelManager;
18-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
18+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
1919
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
2020
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
2121
use lightning::routing::scoring::WriteableScore;
@@ -165,14 +165,15 @@ impl BackgroundProcessor {
165165
P: 'static + Deref + Send + Sync,
166166
Descriptor: 'static + SocketDescriptor + Send + Sync,
167167
CMH: 'static + Deref + Send + Sync,
168+
OMH: 'static + Deref + Send + Sync,
168169
RMH: 'static + Deref + Send + Sync,
169170
EH: 'static + EventHandler + Send,
170171
PS: 'static + Deref + Send,
171172
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
172173
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
173174
NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
174175
UMH: 'static + Deref + Send + Sync,
175-
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
176+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
176177
S: 'static + Deref<Target = SC> + Send + Sync,
177178
SC: WriteableScore<'a>,
178179
>(
@@ -189,6 +190,7 @@ impl BackgroundProcessor {
189190
L::Target: 'static + Logger,
190191
P::Target: 'static + Persist<Signer>,
191192
CMH::Target: 'static + ChannelMessageHandler,
193+
OMH::Target: 'static + OnionMessageHandler,
192194
RMH::Target: 'static + RoutingMessageHandler,
193195
UMH::Target: 'static + CustomMessageHandler,
194196
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
@@ -406,7 +408,7 @@ mod tests {
406408
struct Node {
407409
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
408410
net_graph_msg_handler: Option<Arc<NetGraphMsgHandler<Arc<NetworkGraph>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>>,
409-
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
411+
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
410412
chain_monitor: Arc<ChainMonitor>,
411413
persister: Arc<FilesystemPersister>,
412414
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -501,7 +503,7 @@ mod tests {
501503
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
502504
let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash()));
503505
let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
504-
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
506+
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
505507
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
506508
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
507509
let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };

lightning-net-tokio/src/lib.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
8181
use lightning::ln::peer_handler;
8282
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
8383
use lightning::ln::peer_handler::CustomMessageHandler;
84-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
84+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, NetAddress, RoutingMessageHandler};
8585
use lightning::util::logger::Logger;
8686

8787
use std::task;
@@ -120,9 +120,10 @@ struct Connection {
120120
id: u64,
121121
}
122122
impl Connection {
123-
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
123+
async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
124124
CMH: ChannelMessageHandler + 'static + Send + Sync,
125125
RMH: RoutingMessageHandler + 'static + Send + Sync,
126+
OMH: OnionMessageHandler + 'static + Send + Sync,
126127
L: Logger + 'static + ?Sized + Send + Sync,
127128
UMH: CustomMessageHandler + 'static + Send + Sync {
128129
loop {
@@ -133,9 +134,10 @@ impl Connection {
133134
}
134135
}
135136

136-
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
137+
async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
137138
CMH: ChannelMessageHandler + 'static + Send + Sync,
138139
RMH: RoutingMessageHandler + 'static + Send + Sync,
140+
OMH: OnionMessageHandler + 'static + Send + Sync,
139141
L: Logger + 'static + ?Sized + Send + Sync,
140142
UMH: CustomMessageHandler + 'static + Send + Sync {
141143
// Create a waker to wake up poll_event_process, above
@@ -255,9 +257,10 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
255257
/// The returned future will complete when the peer is disconnected and associated handling
256258
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
257259
/// not need to poll the provided future in order to make progress.
258-
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
260+
pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
259261
CMH: ChannelMessageHandler + 'static + Send + Sync,
260262
RMH: RoutingMessageHandler + 'static + Send + Sync,
263+
OMH: OnionMessageHandler + 'static + Send + Sync,
261264
L: Logger + 'static + ?Sized + Send + Sync,
262265
UMH: CustomMessageHandler + 'static + Send + Sync {
263266
let remote_addr = get_addr_from_stream(&stream);
@@ -297,9 +300,10 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
297300
/// The returned future will complete when the peer is disconnected and associated handling
298301
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
299302
/// not need to poll the provided future in order to make progress.
300-
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
303+
pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
301304
CMH: ChannelMessageHandler + 'static + Send + Sync,
302305
RMH: RoutingMessageHandler + 'static + Send + Sync,
306+
OMH: OnionMessageHandler + 'static + Send + Sync,
303307
L: Logger + 'static + ?Sized + Send + Sync,
304308
UMH: CustomMessageHandler + 'static + Send + Sync {
305309
let remote_addr = get_addr_from_stream(&stream);
@@ -368,9 +372,10 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
368372
/// disconnected and associated handling futures are freed, though, because all processing in said
369373
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
370374
/// make progress.
371-
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
375+
pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
372376
CMH: ChannelMessageHandler + 'static + Send + Sync,
373377
RMH: RoutingMessageHandler + 'static + Send + Sync,
378+
OMH: OnionMessageHandler + 'static + Send + Sync,
374379
L: Logger + 'static + ?Sized + Send + Sync,
375380
UMH: CustomMessageHandler + 'static + Send + Sync {
376381
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
@@ -618,6 +623,7 @@ mod tests {
618623
let a_manager = Arc::new(PeerManager::new(MessageHandler {
619624
chan_handler: Arc::clone(&a_handler),
620625
route_handler: Arc::clone(&a_handler),
626+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
621627
}, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
622628

623629
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
@@ -632,6 +638,7 @@ mod tests {
632638
let b_manager = Arc::new(PeerManager::new(MessageHandler {
633639
chan_handler: Arc::clone(&b_handler),
634640
route_handler: Arc::clone(&b_handler),
641+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
635642
}, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
636643

637644
// We bind on localhost, hoping the environment is properly configured with a local
@@ -683,6 +690,7 @@ mod tests {
683690

684691
let a_manager = Arc::new(PeerManager::new(MessageHandler {
685692
chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
693+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
686694
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
687695
}, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
688696

lightning/src/ln/channelmanager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5733,6 +5733,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
57335733
&events::MessageSendEvent::SendShortIdsQuery { .. } => false,
57345734
&events::MessageSendEvent::SendReplyChannelRange { .. } => false,
57355735
&events::MessageSendEvent::SendGossipTimestampFilter { .. } => false,
5736+
&events::MessageSendEvent::SendOnionMessage { .. } => false,
57365737
}
57375738
});
57385739
}

lightning/src/ln/msgs.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use bitcoin::blockdata::script::Script;
3131
use bitcoin::hash_types::{Txid, BlockHash};
3232

3333
use ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures};
34+
use ln::onion_message;
3435

3536
use prelude::*;
3637
use core::fmt;
@@ -40,7 +41,7 @@ use io_extras::read_to_end;
4041

4142
use util::events::MessageSendEventsProvider;
4243
use util::logger;
43-
use util::ser::{Readable, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedVarInt};
44+
use util::ser::{Readable, ReadableArgs, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedVarInt};
4445

4546
use ln::{PaymentPreimage, PaymentHash, PaymentSecret};
4647

@@ -304,6 +305,16 @@ pub struct UpdateAddHTLC {
304305
pub(crate) onion_routing_packet: OnionPacket,
305306
}
306307

308+
/// An onion message to be sent or received from a peer
309+
#[derive(Clone, Debug, PartialEq)]
310+
pub struct OnionMessage {
311+
/// This blinding point is used in the shared secret that is used to decrypt the onion message
312+
/// payload's `encrypted_data` field.
313+
pub(crate) blinding_point: PublicKey,
314+
pub(crate) len: u16,
315+
pub(crate) onion_routing_packet: onion_message::Packet,
316+
}
317+
307318
/// An update_fulfill_htlc message to be sent or received from a peer
308319
#[derive(Clone, Debug, PartialEq)]
309320
pub struct UpdateFulfillHTLC {
@@ -912,6 +923,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
912923
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
913924
}
914925

926+
/// A trait to describe an object that can receive onion messages.
927+
pub trait OnionMessageHandler : MessageSendEventsProvider {
928+
/// Handle an incoming onion_message message from the given peer.
929+
fn handle_onion_message(&self, their_node_id: &PublicKey, msg: &OnionMessage);
930+
}
931+
915932
mod fuzzy_internal_msgs {
916933
use prelude::*;
917934
use ln::{PaymentPreimage, PaymentSecret};
@@ -1304,6 +1321,28 @@ impl_writeable_msg!(UpdateAddHTLC, {
13041321
onion_routing_packet
13051322
}, {});
13061323

1324+
impl Readable for OnionMessage {
1325+
fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
1326+
let blinding_point: PublicKey = Readable::read(r)?;
1327+
let len: u16 = Readable::read(r)?;
1328+
let onion_routing_packet: onion_message::Packet = <onion_message::Packet as ReadableArgs<u16>>::read(r, len)?;
1329+
Ok(Self {
1330+
blinding_point,
1331+
len,
1332+
onion_routing_packet,
1333+
})
1334+
}
1335+
}
1336+
1337+
impl Writeable for OnionMessage {
1338+
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
1339+
self.blinding_point.write(w)?;
1340+
self.len.write(w)?;
1341+
self.onion_routing_packet.write(w)?;
1342+
Ok(())
1343+
}
1344+
}
1345+
13071346
impl Writeable for FinalOnionHopData {
13081347
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
13091348
self.payment_secret.0.write(w)?;

lightning/src/ln/onion_message.rs

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,19 @@ use bitcoin::hashes::sha256::Hash as Sha256;
1414
use bitcoin::secp256k1::{self, PublicKey, Secp256k1, SecretKey};
1515
use bitcoin::secp256k1::ecdh::SharedSecret;
1616

17-
use chain::keysinterface::{KeysInterface, Sign};
18-
use ln::msgs::DecodeError;
17+
use chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager, Sign};
18+
use ln::msgs::{self, DecodeError, OnionMessageHandler};
1919
use ln::onion_utils;
2020
use util::chacha20poly1305rfc::{ChaCha20Poly1305RFC, ChaChaPoly1305Writer};
21+
use util::events::{MessageSendEvent, MessageSendEventsProvider};
22+
use util::logger::Logger;
2123
use util::ser::{LengthCalculatingWriter, Readable, ReadableArgs, VecWriter, Writeable, Writer};
2224

25+
use core::mem;
2326
use core::ops::Deref;
2427
use io::{self, Read};
2528
use prelude::*;
29+
use sync::{Arc, Mutex};
2630

2731
#[derive(Clone, Debug, PartialEq)]
2832
pub(crate) struct Packet {
@@ -246,6 +250,58 @@ impl BlindedRoute {
246250
}
247251
}
248252

253+
/// A sender, receiver and forwarder of onion messages. In upcoming releases, this object will be
254+
/// used to retrieve invoices and fulfill invoice requests from offers.
255+
pub struct OnionMessenger<Signer: Sign, K: Deref, L: Deref>
256+
where K::Target: KeysInterface<Signer = Signer>,
257+
L::Target: Logger,
258+
{
259+
keys_manager: K,
260+
logger: L,
261+
pending_msg_events: Mutex<Vec<MessageSendEvent>>,
262+
secp_ctx: Secp256k1<secp256k1::All>,
263+
// Coming soon:
264+
// invoice_handler: InvoiceHandler,
265+
// custom_handler: CustomHandler, // handles custom onion messages
266+
}
267+
268+
impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
269+
where K::Target: KeysInterface<Signer = Signer>,
270+
L::Target: Logger,
271+
{
272+
/// Constructs a new `OnionMessenger` to send, forward, and delegate received onion messages to
273+
/// their respective handlers.
274+
pub fn new(keys_manager: K, logger: L) -> Self {
275+
let mut secp_ctx = Secp256k1::new();
276+
secp_ctx.seeded_randomize(&keys_manager.get_secure_random_bytes());
277+
OnionMessenger {
278+
keys_manager,
279+
pending_msg_events: Mutex::new(Vec::new()),
280+
secp_ctx,
281+
logger,
282+
}
283+
}
284+
}
285+
286+
impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
287+
where K::Target: KeysInterface<Signer = Signer>,
288+
L::Target: Logger,
289+
{
290+
fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) {}
291+
}
292+
293+
impl<Signer: Sign, K: Deref, L: Deref> MessageSendEventsProvider for OnionMessenger<Signer, K, L>
294+
where K::Target: KeysInterface<Signer = Signer>,
295+
L::Target: Logger,
296+
{
297+
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
298+
let mut pending_msg_events = self.pending_msg_events.lock().unwrap();
299+
let mut ret = Vec::new();
300+
mem::swap(&mut ret, &mut *pending_msg_events);
301+
ret
302+
}
303+
}
304+
249305
#[allow(unused_assignments)]
250306
#[inline]
251307
fn construct_keys_callback<T: secp256k1::Signing + secp256k1::Verification, FType: FnMut(PublicKey, SharedSecret, [u8; 32], PublicKey, SharedSecret)> (secp_ctx: &Secp256k1<T>, unblinded_path: &Vec<PublicKey>, session_priv: &SecretKey, mut callback: FType) -> Result<(), secp256k1::Error> {
@@ -317,3 +373,10 @@ fn construct_blinded_route_keys<T: secp256k1::Signing + secp256k1::Verification>
317373

318374
Ok((encrypted_data_keys, blinded_node_pks))
319375
}
376+
377+
/// Useful for simplifying the parameters of [`SimpleArcChannelManager`] and
378+
/// [`SimpleArcPeerManager`]. See their docs for more details.
379+
pub type SimpleArcOnionMessenger<L> = OnionMessenger<InMemorySigner, Arc<KeysManager>, Arc<L>>;
380+
/// Useful for simplifying the parameters of [`SimpleRefChannelManager`] and
381+
/// [`SimpleRefPeerManager`]. See their docs for more details.
382+
pub type SimpleRefOnionMessenger<'a, 'b, L> = OnionMessenger<InMemorySigner, &'a KeysManager, &'b L>;

0 commit comments

Comments
 (0)