@@ -37,6 +37,9 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
37
37
/// indicating that read events on this descriptor should resume. A resume_read of false does
38
38
/// *not* imply that further read events should be paused.
39
39
fn send_data ( & mut self , data : & Vec < u8 > , write_offset : usize , resume_read : bool ) -> usize ;
40
+ ///Indicate to the network provider that we want to disconnect this peer, no more allowing any call
41
+ ///to disconnect_event or read_event
42
+ fn notify_disconnection ( & mut self ) ;
40
43
}
41
44
42
45
/// Error for PeerManager errors. If you get one of these, you must disconnect the socket and
@@ -576,6 +579,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
576
579
/// calls to ChannelManager::process_pending_htlc_forward.
577
580
pub fn process_events ( & self ) {
578
581
let mut upstream_events = Vec :: new ( ) ;
582
+ let mut disconnect_peers = Vec :: new ( ) ;
579
583
{
580
584
// TODO: There are some DoS attacks here where you can flood someone's outbound send
581
585
// buffer by doing things like announcing channels on another node. We should be willing to
@@ -710,12 +714,30 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
710
714
}
711
715
continue ;
712
716
} ,
717
+ Event :: SendErrorMessage { ref node_id, ref msg, ref disconnect, ref hard } => {
718
+ let ( mut descriptor, peer) = get_peer_for_forwarding ! ( node_id, {
719
+ //TODO: Do whatever we're gonna do for handling dropped messages
720
+ } ) ;
721
+ if * disconnect == true {
722
+ disconnect_peers. push ( descriptor. clone ( ) ) ;
723
+ }
724
+ if * hard == false {
725
+ peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
726
+ Self :: do_attempt_write_data ( & mut descriptor, peer) ;
727
+ }
728
+ continue ;
729
+ } ,
713
730
}
714
731
715
732
upstream_events. push ( event) ;
716
733
}
717
734
}
718
735
736
+ for mut descriptor in disconnect_peers {
737
+ descriptor. notify_disconnection ( ) ;
738
+ self . disconnect_event ( & descriptor) ;
739
+ }
740
+
719
741
let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
720
742
for event in upstream_events. drain ( ..) {
721
743
pending_events. push ( event) ;
@@ -756,3 +778,92 @@ impl<Descriptor: SocketDescriptor> EventsProvider for PeerManager<Descriptor> {
756
778
ret
757
779
}
758
780
}
781
+
782
+ #[ cfg( test) ]
783
+ mod tests {
784
+ use chain:: chaininterface;
785
+ use ln:: peer_handler:: { PeerManager , MessageHandler , SocketDescriptor } ;
786
+ use ln:: channelmanager:: { ChannelManager } ;
787
+ use ln:: router:: { Router } ;
788
+ use ln:: msgs;
789
+ use ln:: msgs:: { HandleError } ;
790
+ use util:: test_utils;
791
+ use util:: events;
792
+
793
+ use bitcoin:: network:: constants:: Network ;
794
+ use bitcoin:: util:: misc:: hex_bytes;
795
+
796
+ use secp256k1:: Secp256k1 ;
797
+ use secp256k1:: key:: { SecretKey , PublicKey } ;
798
+
799
+ use rand:: { thread_rng, Rng } ;
800
+
801
+ use std:: sync:: { Arc , Mutex } ;
802
+
803
+ #[ derive( PartialEq , Eq , Clone , Hash ) ]
804
+ struct FileDescriptor {
805
+ fd : u16 ,
806
+ }
807
+
808
+ impl SocketDescriptor for FileDescriptor {
809
+ fn send_data ( & mut self , data : & Vec < u8 > , _write_offset : usize , _resume_read : bool ) -> usize {
810
+ data. len ( )
811
+ }
812
+
813
+ fn notify_disconnection ( & mut self ) { }
814
+ }
815
+
816
+ fn create_network ( peer_count : usize ) -> Vec < PeerManager < FileDescriptor > > {
817
+ let secp_ctx = Secp256k1 :: new ( ) ;
818
+ let mut peers = Vec :: new ( ) ;
819
+ let mut rng = thread_rng ( ) ;
820
+
821
+ for _ in 0 ..peer_count {
822
+ let feeest = Arc :: new ( test_utils:: TestFeeEstimator { sat_per_vbyte : 1 } ) ;
823
+ let chain_monitor = Arc :: new ( chaininterface:: ChainWatchInterfaceUtil :: new ( ) ) ;
824
+ let tx_broadcaster = Arc :: new ( test_utils:: TestBroadcaster { txn_broadcasted : Mutex :: new ( Vec :: new ( ) ) } ) ;
825
+ let chan_monitor = Arc :: new ( test_utils:: TestChannelMonitor :: new ( chain_monitor. clone ( ) , tx_broadcaster. clone ( ) ) ) ;
826
+ let node_id = {
827
+ let mut key_slice = [ 0 ; 32 ] ;
828
+ rng. fill_bytes ( & mut key_slice) ;
829
+ SecretKey :: from_slice ( & secp_ctx, & key_slice) . unwrap ( )
830
+ } ;
831
+ let node = ChannelManager :: new ( node_id. clone ( ) , 0 , true , Network :: Testnet , feeest. clone ( ) , chan_monitor. clone ( ) , chain_monitor. clone ( ) , tx_broadcaster. clone ( ) ) . unwrap ( ) ;
832
+ let router = Router :: new ( PublicKey :: from_secret_key ( & secp_ctx, & node_id) . unwrap ( ) ) ;
833
+ let msg_handler = MessageHandler { chan_handler : node, route_handler : Arc :: new ( router) } ;
834
+ let peer = PeerManager :: new ( msg_handler, node_id) ;
835
+ peers. push ( peer) ;
836
+ }
837
+
838
+ peers
839
+ }
840
+
841
+ fn establish_connection ( peer_a : & PeerManager < FileDescriptor > , peer_b : & PeerManager < FileDescriptor > ) {
842
+ let secp_ctx = Secp256k1 :: new ( ) ;
843
+ let their_id = PublicKey :: from_secret_key ( & secp_ctx, & peer_b. our_node_secret ) . unwrap ( ) ;
844
+ let fd = FileDescriptor { fd : 1 } ;
845
+ peer_a. new_inbound_connection ( fd. clone ( ) ) ;
846
+ peer_a. peers . lock ( ) . unwrap ( ) . node_id_to_descriptor . insert ( their_id, fd. clone ( ) ) ;
847
+ }
848
+
849
+ #[ test]
850
+ fn test_disconnect_peer ( ) {
851
+ // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
852
+ // push an DisconnectPeer event to remove the node flagged by id
853
+ let peers = create_network ( 2 ) ;
854
+ establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
855
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
856
+
857
+ let secp_ctx = Secp256k1 :: new ( ) ;
858
+ let their_id = PublicKey :: from_secret_key ( & secp_ctx, & peers[ 1 ] . our_node_secret ) . unwrap ( ) ;
859
+ peers[ 0 ] . message_handler . chan_handler . push_event ( events:: Event :: SendErrorMessage {
860
+ node_id : their_id,
861
+ msg : msgs:: ErrorMessage { err : "" } ,
862
+ disconnect : true ,
863
+ hard : true ,
864
+ } ) ;
865
+
866
+ peers[ 0 ] . process_events ( ) ;
867
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 0 ) ;
868
+ }
869
+ }
0 commit comments