Skip to content

Commit 99123cd

Browse files
authored
Merge pull request #1604 from valentinewallace/2022-07-OMs-followup
Onion messages: add some initial rate limiting
2 parents 2358b0b + f4834de commit 99123cd

File tree

10 files changed

+145
-49
lines changed

10 files changed

+145
-49
lines changed

fuzz/src/full_stack.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ type ChannelMan = ChannelManager<
166166
EnforcingSigner,
167167
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
168168
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
169-
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, IgnoringMessageHandler>;
169+
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler>;
170170

171171
struct MoneyLossDetector<'a> {
172172
manager: Arc<ChannelMan>,
@@ -414,6 +414,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
414414
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
415415
chan_handler: channelmanager.clone(),
416416
route_handler: gossip_sync.clone(),
417+
onion_message_handler: IgnoringMessageHandler {},
417418
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{}));
418419

419420
let mut should_forward = false;

fuzz/src/onion_message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use bitcoin::secp256k1::ecdh::SharedSecret;
66
use bitcoin::secp256k1::ecdsa::RecoverableSignature;
77

88
use lightning::chain::keysinterface::{Recipient, KeyMaterial, KeysInterface};
9-
use lightning::ln::msgs::{self, DecodeError};
9+
use lightning::ln::msgs::{self, DecodeError, OnionMessageHandler};
1010
use lightning::ln::script::ShutdownScript;
1111
use lightning::util::enforcing_trait_impls::EnforcingSigner;
1212
use lightning::util::logger::Logger;

lightning-background-processor/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
1919
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
2020
use lightning::chain::keysinterface::{Sign, KeysInterface};
2121
use lightning::ln::channelmanager::ChannelManager;
22-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
22+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
2323
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
2424
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
2525
use lightning::routing::scoring::WriteableScore;
@@ -281,6 +281,7 @@ impl BackgroundProcessor {
281281
P: 'static + Deref + Send + Sync,
282282
Descriptor: 'static + SocketDescriptor + Send + Sync,
283283
CMH: 'static + Deref + Send + Sync,
284+
OMH: 'static + Deref + Send + Sync,
284285
RMH: 'static + Deref + Send + Sync,
285286
EH: 'static + EventHandler + Send,
286287
PS: 'static + Deref + Send,
@@ -289,7 +290,7 @@ impl BackgroundProcessor {
289290
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
290291
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
291292
UMH: 'static + Deref + Send + Sync,
292-
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
293+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
293294
S: 'static + Deref<Target = SC> + Send + Sync,
294295
SC: WriteableScore<'a>,
295296
>(
@@ -306,6 +307,7 @@ impl BackgroundProcessor {
306307
L::Target: 'static + Logger,
307308
P::Target: 'static + Persist<Signer>,
308309
CMH::Target: 'static + ChannelMessageHandler,
310+
OMH::Target: 'static + OnionMessageHandler,
309311
RMH::Target: 'static + RoutingMessageHandler,
310312
UMH::Target: 'static + CustomMessageHandler,
311313
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
@@ -544,7 +546,7 @@ mod tests {
544546
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
545547
p2p_gossip_sync: PGS,
546548
rapid_gossip_sync: RGS,
547-
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
549+
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
548550
chain_monitor: Arc<ChainMonitor>,
549551
persister: Arc<FilesystemPersister>,
550552
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -663,7 +665,7 @@ mod tests {
663665
let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
664666
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
665667
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
666-
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
668+
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
667669
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
668670
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
669671
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };

lightning-net-tokio/src/lib.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
8383
use lightning::ln::peer_handler;
8484
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
8585
use lightning::ln::peer_handler::CustomMessageHandler;
86-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
86+
use lightning::ln::msgs::{ChannelMessageHandler, NetAddress, OnionMessageHandler, RoutingMessageHandler};
8787
use lightning::util::logger::Logger;
8888

8989
use std::ops::Deref;
@@ -123,13 +123,15 @@ struct Connection {
123123
id: u64,
124124
}
125125
impl Connection {
126-
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
126+
async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
127127
CMH: Deref + 'static + Send + Sync,
128128
RMH: Deref + 'static + Send + Sync,
129+
OMH: Deref + 'static + Send + Sync,
129130
L: Deref + 'static + Send + Sync,
130131
UMH: Deref + 'static + Send + Sync,
131132
CMH::Target: ChannelMessageHandler + Send + Sync,
132133
RMH::Target: RoutingMessageHandler + Send + Sync,
134+
OMH::Target: OnionMessageHandler + Send + Sync,
133135
L::Target: Logger + Send + Sync,
134136
UMH::Target: CustomMessageHandler + Send + Sync,
135137
{
@@ -141,13 +143,15 @@ impl Connection {
141143
}
142144
}
143145

144-
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
146+
async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
145147
CMH: Deref + 'static + Send + Sync,
146148
RMH: Deref + 'static + Send + Sync,
149+
OMH: Deref + 'static + Send + Sync,
147150
L: Deref + 'static + Send + Sync,
148151
UMH: Deref + 'static + Send + Sync,
149152
CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
150153
RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
154+
OMH::Target: OnionMessageHandler + 'static + Send + Sync,
151155
L::Target: Logger + 'static + Send + Sync,
152156
UMH::Target: CustomMessageHandler + 'static + Send + Sync,
153157
{
@@ -268,13 +272,15 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
268272
/// The returned future will complete when the peer is disconnected and associated handling
269273
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
270274
/// not need to poll the provided future in order to make progress.
271-
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
275+
pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
272276
CMH: Deref + 'static + Send + Sync,
273277
RMH: Deref + 'static + Send + Sync,
278+
OMH: Deref + 'static + Send + Sync,
274279
L: Deref + 'static + Send + Sync,
275280
UMH: Deref + 'static + Send + Sync,
276281
CMH::Target: ChannelMessageHandler + Send + Sync,
277282
RMH::Target: RoutingMessageHandler + Send + Sync,
283+
OMH::Target: OnionMessageHandler + Send + Sync,
278284
L::Target: Logger + Send + Sync,
279285
UMH::Target: CustomMessageHandler + Send + Sync,
280286
{
@@ -315,13 +321,15 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
315321
/// The returned future will complete when the peer is disconnected and associated handling
316322
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
317323
/// not need to poll the provided future in order to make progress.
318-
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
324+
pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
319325
CMH: Deref + 'static + Send + Sync,
320326
RMH: Deref + 'static + Send + Sync,
327+
OMH: Deref + 'static + Send + Sync,
321328
L: Deref + 'static + Send + Sync,
322329
UMH: Deref + 'static + Send + Sync,
323330
CMH::Target: ChannelMessageHandler + Send + Sync,
324331
RMH::Target: RoutingMessageHandler + Send + Sync,
332+
OMH::Target: OnionMessageHandler + Send + Sync,
325333
L::Target: Logger + Send + Sync,
326334
UMH::Target: CustomMessageHandler + Send + Sync,
327335
{
@@ -391,13 +399,15 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
391399
/// disconnected and associated handling futures are freed, though, because all processing in said
392400
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
393401
/// make progress.
394-
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
402+
pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
395403
CMH: Deref + 'static + Send + Sync,
396404
RMH: Deref + 'static + Send + Sync,
405+
OMH: Deref + 'static + Send + Sync,
397406
L: Deref + 'static + Send + Sync,
398407
UMH: Deref + 'static + Send + Sync,
399408
CMH::Target: ChannelMessageHandler + Send + Sync,
400409
RMH::Target: RoutingMessageHandler + Send + Sync,
410+
OMH::Target: OnionMessageHandler + Send + Sync,
401411
L::Target: Logger + Send + Sync,
402412
UMH::Target: CustomMessageHandler + Send + Sync,
403413
{
@@ -646,6 +656,7 @@ mod tests {
646656
let a_manager = Arc::new(PeerManager::new(MessageHandler {
647657
chan_handler: Arc::clone(&a_handler),
648658
route_handler: Arc::clone(&a_handler),
659+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
649660
}, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
650661

651662
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
@@ -660,6 +671,7 @@ mod tests {
660671
let b_manager = Arc::new(PeerManager::new(MessageHandler {
661672
chan_handler: Arc::clone(&b_handler),
662673
route_handler: Arc::clone(&b_handler),
674+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
663675
}, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
664676

665677
// We bind on localhost, hoping the environment is properly configured with a local
@@ -711,6 +723,7 @@ mod tests {
711723

712724
let a_manager = Arc::new(PeerManager::new(MessageHandler {
713725
chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
726+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
714727
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
715728
}, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
716729

lightning/src/ln/msgs.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use core::fmt::Debug;
4040
use io::{self, Read};
4141
use io_extras::read_to_end;
4242

43-
use util::events::MessageSendEventsProvider;
43+
use util::events::{MessageSendEventsProvider, OnionMessageProvider};
4444
use util::logger;
4545
use util::ser::{BigSize, LengthReadable, Readable, ReadableArgs, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname};
4646

@@ -945,6 +945,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
945945
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
946946
}
947947

948+
/// A trait to describe an object that can receive onion messages.
949+
pub trait OnionMessageHandler : OnionMessageProvider {
950+
/// Handle an incoming onion_message message from the given peer.
951+
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
952+
}
953+
948954
mod fuzzy_internal_msgs {
949955
use prelude::*;
950956
use ln::{PaymentPreimage, PaymentSecret};

0 commit comments

Comments
 (0)