@@ -90,6 +90,7 @@ impl AccessFuture {
90
90
pub fn resolve < L : Deref > ( & self , graph : & NetworkGraph < L > , result : Result < TxOut , ChainAccessError > )
91
91
where L :: Target : Logger {
92
92
let announcement = {
93
+ let mut pending_checks = graph. pending_checks . internal . lock ( ) . unwrap ( ) ;
93
94
let mut async_messages = self . state . lock ( ) . unwrap ( ) ;
94
95
95
96
if async_messages. channel_announce . is_none ( ) {
@@ -99,6 +100,19 @@ impl AccessFuture {
99
100
async_messages. complete = Some ( result) ;
100
101
return ;
101
102
}
103
+ let announcement_msg = match async_messages. channel_announce . as_ref ( ) . unwrap ( ) {
104
+ ChannelAnnouncement :: Full ( signed_msg) => & signed_msg. contents ,
105
+ ChannelAnnouncement :: Unsigned ( msg) => & msg,
106
+ } ;
107
+
108
+ // First remove the pending entries for this lookup...
109
+ if let hash_map:: Entry :: Occupied ( e) =
110
+ pending_checks. channels . entry ( announcement_msg. short_channel_id )
111
+ {
112
+ if core:: ptr:: eq ( Weak :: as_ptr ( e. get ( ) ) , Arc :: as_ptr ( & self . state ) ) {
113
+ e. remove ( ) ;
114
+ }
115
+ }
102
116
103
117
async_messages. channel_announce . take ( ) . unwrap ( )
104
118
} ;
@@ -119,13 +133,20 @@ impl AccessFuture {
119
133
}
120
134
}
121
135
136
+ struct PendingChecksContext {
137
+ channels : HashMap < u64 , Weak < Mutex < AccessMessages > > > ,
138
+ }
139
+
122
140
/// A set of messages which are pending UTXO lookups for processing.
123
141
pub ( super ) struct PendingChecks {
142
+ internal : Mutex < PendingChecksContext > ,
124
143
}
125
144
126
145
impl PendingChecks {
127
146
pub ( super ) fn new ( ) -> Self {
128
- PendingChecks { }
147
+ PendingChecks { internal : Mutex :: new ( PendingChecksContext {
148
+ channels : HashMap :: new ( ) ,
149
+ } ) }
129
150
}
130
151
131
152
pub ( super ) fn check_channel_announcement < A : Deref > ( & self ,
@@ -162,6 +183,60 @@ impl PendingChecks {
162
183
}
163
184
} ;
164
185
186
+ macro_rules! check_previous_entry {
187
+ ( $pending_channels: expr, $item_for_replace: expr) => {
188
+ match $pending_channels. entry( msg. short_channel_id) {
189
+ hash_map:: Entry :: Occupied ( mut e) => {
190
+ // There's already a pending lookup for the given SCID. Check if the messages
191
+ // are the same and, if so, return immediately (don't bother spawning another
192
+ // lookup if we haven't gotten that far yet).
193
+ match Weak :: upgrade( & e. get( ) ) {
194
+ Some ( pending_msgs) => {
195
+ let pending_matches = match & pending_msgs. lock( ) . unwrap( ) . channel_announce {
196
+ Some ( ChannelAnnouncement :: Full ( pending_msg) ) => Some ( pending_msg) == full_msg,
197
+ Some ( ChannelAnnouncement :: Unsigned ( pending_msg) ) => pending_msg == msg,
198
+ None => {
199
+ // This shouldn't actually be reachable. We set the
200
+ // `channel_announce` field under the same lock as setting the
201
+ // channel map entry. Still, we can just treat it as
202
+ // non-matching and let the new request fly.
203
+ false
204
+ } ,
205
+ } ;
206
+ if pending_matches {
207
+ return Err ( LightningError {
208
+ err: "Channel announcement is already being checked" . to_owned( ) ,
209
+ action: ErrorAction :: IgnoreError ,
210
+ } ) ;
211
+ } else {
212
+ // The earlier lookup is a different message. If we have another
213
+ // request in-flight now replace the original.
214
+ // Note that in the replace case whether to replace is somewhat
215
+ // arbitrary - both results will be handled, we're just updating the
216
+ // value that will be compared to future lookups with the same SCID.
217
+ if let Some ( item) = $item_for_replace {
218
+ * e. get_mut( ) = item;
219
+ }
220
+ }
221
+ } ,
222
+ None => {
223
+ // The earlier lookup already resolved. We can't be sure its the same
224
+ // so just remove/replace it and move on.
225
+ if let Some ( item) = $item_for_replace {
226
+ * e. get_mut( ) = item;
227
+ } else { e. remove( ) ; }
228
+ } ,
229
+ }
230
+ } ,
231
+ hash_map:: Entry :: Vacant ( v) => {
232
+ if let Some ( item) = $item_for_replace { v. insert( item) ; }
233
+ } ,
234
+ }
235
+ }
236
+ }
237
+
238
+ check_previous_entry ! ( self . internal. lock( ) . unwrap( ) . channels, None ) ;
239
+
165
240
match chain_access {
166
241
& None => {
167
242
// Tentatively accept, potentially exposing us to DoS attacks
@@ -171,12 +246,14 @@ impl PendingChecks {
171
246
match chain_access. get_utxo ( & msg. chain_hash , msg. short_channel_id ) {
172
247
ChainAccessResult :: Sync ( res) => handle_result ( res) ,
173
248
ChainAccessResult :: Async ( future) => {
249
+ let mut pending_checks = self . internal . lock ( ) . unwrap ( ) ;
174
250
let mut async_messages = future. state . lock ( ) . unwrap ( ) ;
175
251
if let Some ( res) = async_messages. complete . take ( ) {
176
252
// In the unlikely event the future resolved before we managed to get it,
177
253
// handle the result in-line.
178
254
handle_result ( res)
179
255
} else {
256
+ check_previous_entry ! ( pending_checks. channels, Some ( Arc :: downgrade( & future. state) ) ) ;
180
257
async_messages. channel_announce = Some (
181
258
if let Some ( msg) = full_msg { ChannelAnnouncement :: Full ( msg. clone ( ) ) }
182
259
else { ChannelAnnouncement :: Unsigned ( msg. clone ( ) ) } ) ;
0 commit comments