@@ -106,6 +106,7 @@ impl AccessFuture {
106
106
pub fn resolve < L : Deref > ( & self , graph : & NetworkGraph < L > , result : Result < TxOut , ChainAccessError > )
107
107
where L :: Target : Logger {
108
108
let announcement = {
109
+ let mut pending_checks = graph. pending_checks . internal . lock ( ) . unwrap ( ) ;
109
110
let mut async_messages = self . state . lock ( ) . unwrap ( ) ;
110
111
111
112
if async_messages. channel_announce . is_none ( ) {
@@ -115,6 +116,12 @@ impl AccessFuture {
115
116
async_messages. complete = Some ( result) ;
116
117
return ;
117
118
}
119
+ let announcement_msg = match async_messages. channel_announce . as_ref ( ) . unwrap ( ) {
120
+ ChannelAnnouncement :: Full ( signed_msg) => & signed_msg. contents ,
121
+ ChannelAnnouncement :: Unsigned ( msg) => & msg,
122
+ } ;
123
+
124
+ pending_checks. lookup_completed ( announcement_msg, & Arc :: downgrade ( & self . state ) ) ;
118
125
119
126
async_messages. channel_announce . take ( ) . unwrap ( )
120
127
} ;
@@ -135,13 +142,87 @@ impl AccessFuture {
135
142
}
136
143
}
137
144
145
+ struct PendingChecksContext {
146
+ channels : HashMap < u64 , Weak < Mutex < AccessMessages > > > ,
147
+ }
148
+
149
+ impl PendingChecksContext {
150
+ fn lookup_completed ( & mut self ,
151
+ msg : & msgs:: UnsignedChannelAnnouncement , completed_state : & Weak < Mutex < AccessMessages > >
152
+ ) {
153
+ if let hash_map:: Entry :: Occupied ( e) = self . channels . entry ( msg. short_channel_id ) {
154
+ if Weak :: ptr_eq ( e. get ( ) , & completed_state) {
155
+ e. remove ( ) ;
156
+ }
157
+ }
158
+ }
159
+ }
160
+
138
161
/// A set of messages which are pending UTXO lookups for processing.
139
162
pub ( super ) struct PendingChecks {
163
+ internal : Mutex < PendingChecksContext > ,
140
164
}
141
165
142
166
impl PendingChecks {
143
167
pub ( super ) fn new ( ) -> Self {
144
- PendingChecks { }
168
+ PendingChecks { internal : Mutex :: new ( PendingChecksContext {
169
+ channels : HashMap :: new ( ) ,
170
+ } ) }
171
+ }
172
+
173
+ fn check_replace_previous_entry ( msg : & msgs:: UnsignedChannelAnnouncement ,
174
+ full_msg : Option < & msgs:: ChannelAnnouncement > , replacement : Option < Weak < Mutex < AccessMessages > > > ,
175
+ pending_channels : & mut HashMap < u64 , Weak < Mutex < AccessMessages > > >
176
+ ) -> Result < ( ) , msgs:: LightningError > {
177
+ match pending_channels. entry ( msg. short_channel_id ) {
178
+ hash_map:: Entry :: Occupied ( mut e) => {
179
+ // There's already a pending lookup for the given SCID. Check if the messages
180
+ // are the same and, if so, return immediately (don't bother spawning another
181
+ // lookup if we haven't gotten that far yet).
182
+ match Weak :: upgrade ( & e. get ( ) ) {
183
+ Some ( pending_msgs) => {
184
+ let pending_matches = match & pending_msgs. lock ( ) . unwrap ( ) . channel_announce {
185
+ Some ( ChannelAnnouncement :: Full ( pending_msg) ) => Some ( pending_msg) == full_msg,
186
+ Some ( ChannelAnnouncement :: Unsigned ( pending_msg) ) => pending_msg == msg,
187
+ None => {
188
+ // This shouldn't actually be reachable. We set the
189
+ // `channel_announce` field under the same lock as setting the
190
+ // channel map entry. Still, we can just treat it as
191
+ // non-matching and let the new request fly.
192
+ debug_assert ! ( false ) ;
193
+ false
194
+ } ,
195
+ } ;
196
+ if pending_matches {
197
+ return Err ( LightningError {
198
+ err : "Channel announcement is already being checked" . to_owned ( ) ,
199
+ action : ErrorAction :: IgnoreDuplicateGossip ,
200
+ } ) ;
201
+ } else {
202
+ // The earlier lookup is a different message. If we have another
203
+ // request in-flight now replace the original.
204
+ // Note that in the replace case whether to replace is somewhat
205
+ // arbitrary - both results will be handled, we're just updating the
206
+ // value that will be compared to future lookups with the same SCID.
207
+ if let Some ( item) = replacement {
208
+ * e. get_mut ( ) = item;
209
+ }
210
+ }
211
+ } ,
212
+ None => {
213
+ // The earlier lookup already resolved. We can't be sure its the same
214
+ // so just remove/replace it and move on.
215
+ if let Some ( item) = replacement {
216
+ * e. get_mut ( ) = item;
217
+ } else { e. remove ( ) ; }
218
+ } ,
219
+ }
220
+ } ,
221
+ hash_map:: Entry :: Vacant ( v) => {
222
+ if let Some ( item) = replacement { v. insert ( item) ; }
223
+ } ,
224
+ }
225
+ Ok ( ( ) )
145
226
}
146
227
147
228
pub ( super ) fn check_channel_announcement < A : Deref > ( & self ,
@@ -178,6 +259,9 @@ impl PendingChecks {
178
259
}
179
260
} ;
180
261
262
+ Self :: check_replace_previous_entry ( msg, full_msg, None ,
263
+ & mut self . internal . lock ( ) . unwrap ( ) . channels ) ?;
264
+
181
265
match chain_access {
182
266
& None => {
183
267
// Tentatively accept, potentially exposing us to DoS attacks
@@ -187,12 +271,15 @@ impl PendingChecks {
187
271
match chain_access. get_utxo ( & msg. chain_hash , msg. short_channel_id ) {
188
272
ChainAccessResult :: Sync ( res) => handle_result ( res) ,
189
273
ChainAccessResult :: Async ( future) => {
274
+ let mut pending_checks = self . internal . lock ( ) . unwrap ( ) ;
190
275
let mut async_messages = future. state . lock ( ) . unwrap ( ) ;
191
276
if let Some ( res) = async_messages. complete . take ( ) {
192
277
// In the unlikely event the future resolved before we managed to get it,
193
278
// handle the result in-line.
194
279
handle_result ( res)
195
280
} else {
281
+ Self :: check_replace_previous_entry ( msg, full_msg,
282
+ Some ( Arc :: downgrade ( & future. state ) ) , & mut pending_checks. channels ) ?;
196
283
async_messages. channel_announce = Some (
197
284
if let Some ( msg) = full_msg { ChannelAnnouncement :: Full ( msg. clone ( ) ) }
198
285
else { ChannelAnnouncement :: Unsigned ( msg. clone ( ) ) } ) ;
0 commit comments