@@ -576,6 +576,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
576
576
/// calls to ChannelManager::process_pending_htlc_forward.
577
577
pub fn process_events ( & self ) {
578
578
let mut upstream_events = Vec :: new ( ) ;
579
+ let mut disconnect_peers = Vec :: new ( ) ;
579
580
{
580
581
// TODO: There are some DoS attacks here where you can flood someone's outbound send
581
582
// buffer by doing things like announcing channels on another node. We should be willing to
@@ -710,12 +711,31 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
710
711
}
711
712
continue ;
712
713
} ,
714
+ Event :: SendErrorMessage { ref node_id, ref msg } => {
715
+ let ( mut descriptor, peer) = get_peer_for_forwarding ! ( node_id, {
716
+ //TODO: Do whatever we're gonna do for handling dropped messages
717
+ } ) ;
718
+ peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
719
+ Self :: do_attempt_write_data ( & mut descriptor, peer) ;
720
+ continue ;
721
+ } ,
722
+ Event :: DisconnectPeer { ref node_id } => {
723
+ let ( mut descriptor, peer) = get_peer_for_forwarding ! ( node_id, {
724
+ //TODO: Do whatever we're gonna do for handling dropped messages
725
+ } ) ;
726
+ disconnect_peers. push ( descriptor. clone ( ) ) ;
727
+ continue ;
728
+ }
713
729
}
714
730
715
731
upstream_events. push ( event) ;
716
732
}
717
733
}
718
734
735
+ for descriptor in disconnect_peers {
736
+ self . disconnect_event ( & descriptor) ;
737
+ }
738
+
719
739
let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
720
740
for event in upstream_events. drain ( ..) {
721
741
pending_events. push ( event) ;
@@ -756,3 +776,87 @@ impl<Descriptor: SocketDescriptor> EventsProvider for PeerManager<Descriptor> {
756
776
ret
757
777
}
758
778
}
779
+
780
+ #[ cfg( test) ]
781
+ mod tests {
782
+ use chain:: chaininterface;
783
+ use ln:: peer_handler:: { PeerManager , MessageHandler , SocketDescriptor } ;
784
+ use ln:: channelmanager:: { ChannelManager } ;
785
+ use ln:: router:: { Router } ;
786
+ use ln:: msgs;
787
+ use ln:: msgs:: { HandleError } ;
788
+ use util:: test_utils;
789
+ use util:: events;
790
+
791
+ use bitcoin:: network:: constants:: Network ;
792
+ use bitcoin:: util:: misc:: hex_bytes;
793
+
794
+ use secp256k1:: Secp256k1 ;
795
+ use secp256k1:: key:: { SecretKey , PublicKey } ;
796
+
797
+ use rand:: { thread_rng, Rng } ;
798
+
799
+ use std:: sync:: { Arc , Mutex } ;
800
+
801
+ #[ derive( PartialEq , Eq , Clone , Hash ) ]
802
+ struct FileDescriptor {
803
+ fd : u16 ,
804
+ }
805
+
806
+ impl SocketDescriptor for FileDescriptor {
807
+ fn send_data ( & mut self , data : & Vec < u8 > , _write_offset : usize , _resume_read : bool ) -> usize {
808
+ data. len ( )
809
+ }
810
+ }
811
+
812
+ fn create_network ( peer_count : usize ) -> Vec < PeerManager < FileDescriptor > > {
813
+ let secp_ctx = Secp256k1 :: new ( ) ;
814
+ let mut peers = Vec :: new ( ) ;
815
+ let mut rng = thread_rng ( ) ;
816
+
817
+ for _ in 0 ..peer_count {
818
+ let feeest = Arc :: new ( test_utils:: TestFeeEstimator { sat_per_vbyte : 1 } ) ;
819
+ let chain_monitor = Arc :: new ( chaininterface:: ChainWatchInterfaceUtil :: new ( ) ) ;
820
+ let tx_broadcaster = Arc :: new ( test_utils:: TestBroadcaster { txn_broadcasted : Mutex :: new ( Vec :: new ( ) ) } ) ;
821
+ let chan_monitor = Arc :: new ( test_utils:: TestChannelMonitor :: new ( chain_monitor. clone ( ) , tx_broadcaster. clone ( ) ) ) ;
822
+ let node_id = {
823
+ let mut key_slice = [ 0 ; 32 ] ;
824
+ rng. fill_bytes ( & mut key_slice) ;
825
+ SecretKey :: from_slice ( & secp_ctx, & key_slice) . unwrap ( )
826
+ } ;
827
+ let node = ChannelManager :: new ( node_id. clone ( ) , 0 , true , Network :: Testnet , feeest. clone ( ) , chan_monitor. clone ( ) , chain_monitor. clone ( ) , tx_broadcaster. clone ( ) ) . unwrap ( ) ;
828
+ let router = Router :: new ( PublicKey :: from_secret_key ( & secp_ctx, & node_id) . unwrap ( ) ) ;
829
+ let msg_handler = MessageHandler { chan_handler : node, route_handler : Arc :: new ( router) } ;
830
+ let peer = PeerManager :: new ( msg_handler, node_id) ;
831
+ peers. push ( peer) ;
832
+ }
833
+
834
+ peers
835
+ }
836
+
837
+ fn establish_connection ( peer_a : & PeerManager < FileDescriptor > , peer_b : & PeerManager < FileDescriptor > ) {
838
+ let secp_ctx = Secp256k1 :: new ( ) ;
839
+ let their_id = PublicKey :: from_secret_key ( & secp_ctx, & peer_b. our_node_secret ) . unwrap ( ) ;
840
+ let fd = FileDescriptor { fd : 1 } ;
841
+ peer_a. new_inbound_connection ( fd. clone ( ) ) ;
842
+ peer_a. peers . lock ( ) . unwrap ( ) . node_id_to_descriptor . insert ( their_id, fd. clone ( ) ) ;
843
+ }
844
+
845
+ #[ test]
846
+ fn test_disconnect_peer ( ) {
847
+ // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
848
+ // push an DisconnectPeer event to remove the node flagged by id
849
+ let peers = create_network ( 2 ) ;
850
+ establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
851
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
852
+
853
+ let secp_ctx = Secp256k1 :: new ( ) ;
854
+ let their_id = PublicKey :: from_secret_key ( & secp_ctx, & peers[ 1 ] . our_node_secret ) . unwrap ( ) ;
855
+ peers[ 0 ] . message_handler . chan_handler . push_event ( events:: Event :: DisconnectPeer {
856
+ node_id : their_id,
857
+ } ) ;
858
+
859
+ peers[ 0 ] . process_events ( ) ;
860
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 0 ) ;
861
+ }
862
+ }
0 commit comments