@@ -576,6 +576,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
576
576
/// calls to ChannelManager::process_pending_htlc_forward.
577
577
pub fn process_events ( & self ) {
578
578
let mut upstream_events = Vec :: new ( ) ;
579
+ let mut disconnect_peers = Vec :: new ( ) ;
579
580
{
580
581
// TODO: There are some DoS attacks here where you can flood someone's outbound send
581
582
// buffer by doing things like announcing channels on another node. We should be willing to
@@ -710,12 +711,26 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
710
711
}
711
712
continue ;
712
713
} ,
714
+ Event :: DisconnectPeer { ref node_id, ref msg } => {
715
+ let ( mut descriptor, peer) = get_peer_for_forwarding ! ( node_id, {
716
+ //TODO: Do whatever we're gonna do for handling dropped messages
717
+ } ) ;
718
+ peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
719
+ Self :: do_attempt_write_data ( & mut descriptor, peer) ;
720
+ disconnect_peers. push ( descriptor. clone ( ) ) ;
721
+ // TODO: log err to take further action depend on kind of error
722
+ continue ;
723
+ }
713
724
}
714
725
715
726
upstream_events. push ( event) ;
716
727
}
717
728
}
718
729
730
+ for descriptor in disconnect_peers {
731
+ self . disconnect_event ( & descriptor) ;
732
+ }
733
+
719
734
let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
720
735
for event in upstream_events. drain ( ..) {
721
736
pending_events. push ( event) ;
@@ -756,3 +771,103 @@ impl<Descriptor: SocketDescriptor> EventsProvider for PeerManager<Descriptor> {
756
771
ret
757
772
}
758
773
}
774
+
775
+ #[ cfg( test) ]
776
+ mod tests {
777
+ use chain:: chaininterface;
778
+ use ln:: peer_handler:: { PeerManager , MessageHandler , SocketDescriptor } ;
779
+ use ln:: channelmanager:: { ChannelManager } ;
780
+ use ln:: router:: { Router } ;
781
+ use ln:: msgs;
782
+ use ln:: peer_channel_encryptor:: { PeerChannelEncryptor } ;
783
+ use ln:: msgs:: { HandleError } ;
784
+ use util:: test_utils;
785
+ use util:: events;
786
+
787
+ use bitcoin:: network:: constants:: Network ;
788
+ use bitcoin:: util:: misc:: hex_bytes;
789
+
790
+ use secp256k1:: Secp256k1 ;
791
+ use secp256k1:: key:: { SecretKey , PublicKey } ;
792
+
793
+ use rand:: { thread_rng, Rng } ;
794
+
795
+ use std:: sync:: { Arc , Mutex } ;
796
+
797
+ #[ derive( PartialEq , Eq , Clone , Hash ) ]
798
+ struct FileDescriptor {
799
+ fd : u16 ,
800
+ }
801
+
802
+ impl SocketDescriptor for FileDescriptor {
803
+ fn send_data ( & mut self , data : & Vec < u8 > , _write_offset : usize , _resume_read : bool ) -> usize {
804
+ data. len ( )
805
+ }
806
+ }
807
+
808
+ fn create_network ( peer_count : usize ) -> Vec < PeerManager < FileDescriptor > > {
809
+ let secp_ctx = Secp256k1 :: new ( ) ;
810
+ let mut peers = Vec :: new ( ) ;
811
+ let mut rng = thread_rng ( ) ;
812
+
813
+ for _ in 0 ..peer_count {
814
+ let feeest = Arc :: new ( test_utils:: TestFeeEstimator { sat_per_vbyte : 1 } ) ;
815
+ let chain_monitor = Arc :: new ( chaininterface:: ChainWatchInterfaceUtil :: new ( ) ) ;
816
+ let tx_broadcaster = Arc :: new ( test_utils:: TestBroadcaster { txn_broadcasted : Mutex :: new ( Vec :: new ( ) ) } ) ;
817
+ let chan_monitor = Arc :: new ( test_utils:: TestChannelMonitor :: new ( chain_monitor. clone ( ) , tx_broadcaster. clone ( ) ) ) ;
818
+ let node_id = {
819
+ let mut key_slice = [ 0 ; 32 ] ;
820
+ rng. fill_bytes ( & mut key_slice) ;
821
+ SecretKey :: from_slice ( & secp_ctx, & key_slice) . unwrap ( )
822
+ } ;
823
+ let node = ChannelManager :: new ( node_id. clone ( ) , 0 , true , Network :: Testnet , feeest. clone ( ) , chan_monitor. clone ( ) , chain_monitor. clone ( ) , tx_broadcaster. clone ( ) ) . unwrap ( ) ;
824
+ let router = Router :: new ( PublicKey :: from_secret_key ( & secp_ctx, & node_id) . unwrap ( ) ) ;
825
+ let msg_handler = MessageHandler { chan_handler : node, route_handler : Arc :: new ( router) } ;
826
+ let peer = PeerManager :: new ( msg_handler, node_id) ;
827
+ peers. push ( peer) ;
828
+ }
829
+
830
+ peers
831
+ }
832
+
833
+ fn establish_connection ( peer_a : & PeerManager < FileDescriptor > , peer_b : & PeerManager < FileDescriptor > ) {
834
+ let secp_ctx = Secp256k1 :: new ( ) ;
835
+ let their_id = PublicKey :: from_secret_key ( & secp_ctx, & peer_b. our_node_secret ) . unwrap ( ) ;
836
+ let fd = FileDescriptor { fd : 1 } ;
837
+ peer_a. new_inbound_connection ( fd. clone ( ) ) ;
838
+
839
+ let mut inbound_peer_encryptor;
840
+ {
841
+ let our_node_id = SecretKey :: from_slice ( & secp_ctx, & hex_bytes ( "2121212121212121212121212121212121212121212121212121212121212121" ) . unwrap ( ) [ ..] ) . unwrap ( ) ;
842
+ let our_ephemeral = SecretKey :: from_slice ( & secp_ctx, & hex_bytes ( "2222222222222222222222222222222222222222222222222222222222222222" ) . unwrap ( ) [ ..] ) . unwrap ( ) ;
843
+ inbound_peer_encryptor = PeerChannelEncryptor :: new_inbound ( & our_node_id) ;
844
+ let act_one = hex_bytes ( "00036360e856310ce5d294e8be33fc807077dc56ac80d95d9cd4ddbd21325eff73f70df6086551151f58b8afe6c195782c6a" ) . unwrap ( ) . to_vec ( ) ;
845
+ inbound_peer_encryptor. process_act_one_with_ephemeral_key ( & act_one[ ..] , & our_node_id, our_ephemeral) . unwrap ( ) ;
846
+ let act_three = hex_bytes ( "00b9e3a702e93e3a9948c2ed6e5fd7590a6e1c3a0344cfc9d5b57357049aa22355361aa02e55a8fc28fef5bd6d71ad0c38228dc68b1c466263b47fdf31e560e139ba" ) . unwrap ( ) . to_vec ( ) ;
847
+ inbound_peer_encryptor. process_act_three ( & act_three[ ..] ) . unwrap ( ) ;
848
+ }
849
+ if let Some ( peer) = peer_a. peers . lock ( ) . unwrap ( ) . peers . get_mut ( & fd) {
850
+ peer. channel_encryptor = inbound_peer_encryptor;
851
+ }
852
+ peer_a. peers . lock ( ) . unwrap ( ) . node_id_to_descriptor . insert ( their_id, fd. clone ( ) ) ;
853
+ }
854
+
855
+ #[ test]
856
+ fn test_disconnect_peer ( ) {
857
+ // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
858
+ // push an DisconnectPeer event to remove the node flagged by id
859
+ let peers = create_network ( 2 ) ;
860
+ establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
861
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
862
+
863
+ let secp_ctx = Secp256k1 :: new ( ) ;
864
+ let their_id = PublicKey :: from_secret_key ( & secp_ctx, & peers[ 1 ] . our_node_secret ) . unwrap ( ) ;
865
+ peers[ 0 ] . message_handler . chan_handler . push_event ( events:: Event :: DisconnectPeer {
866
+ node_id : their_id,
867
+ msg : HandleError { err : "test disconnect peer B by peer A" , msg : Some ( msgs:: ErrorAction :: DisconnectPeer ) } ,
868
+ } ) ;
869
+
870
+ peers[ 0 ] . process_events ( ) ;
871
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 0 ) ;
872
+ }
873
+ }
0 commit comments