@@ -88,9 +88,16 @@ impl Notifier {
88
88
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
89
89
pub ( crate ) fn notify ( & self ) {
90
90
let mut lock = self . notify_pending . lock ( ) . unwrap ( ) ;
91
- lock . 0 = true ;
91
+ let mut future_probably_generated_calls = false ;
92
92
if let Some ( future_state) = lock. 1 . take ( ) {
93
- future_state. lock ( ) . unwrap ( ) . complete ( ) ;
93
+ future_probably_generated_calls |= future_state. lock ( ) . unwrap ( ) . complete ( ) ;
94
+ future_probably_generated_calls |= Arc :: strong_count ( & future_state) > 1 ;
95
+ }
96
+ if !future_probably_generated_calls {
97
+ // If a future made some callbacks or still exists (i.e. the state has more than the
98
+ // one reference we hold), assume the user was notified and skip the "classic"
99
+ // waiters.
100
+ lock. 0 = true ;
94
101
}
95
102
mem:: drop ( lock) ;
96
103
self . condvar . notify_all ( ) ;
@@ -147,11 +154,14 @@ pub(crate) struct FutureState {
147
154
}
148
155
149
156
impl FutureState {
150
- fn complete ( & mut self ) {
157
+ fn complete ( & mut self ) -> bool {
158
+ let mut made_calls = false ;
151
159
for callback in self . callbacks . drain ( ..) {
152
160
callback. call ( ) ;
161
+ made_calls = true ;
153
162
}
154
163
self . complete = true ;
164
+ made_calls
155
165
}
156
166
}
157
167
@@ -231,6 +241,48 @@ mod tests {
231
241
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
232
242
}
233
243
244
+ #[ test]
245
+ fn notifier_future_completes_wake ( ) {
246
+ // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
247
+ // been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
248
+ // `lightning-background-processor` to persist in a tight loop.
249
+ let notifier = Notifier :: new ( ) ;
250
+
251
+ // First check the simple case, ensuring if we get notified a new future isn't woken until
252
+ // a second `notify`.
253
+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
254
+ let callback_ref = Arc :: clone ( & callback) ;
255
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
256
+ assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
257
+
258
+ notifier. notify ( ) ;
259
+ assert ! ( callback. load( Ordering :: SeqCst ) ) ;
260
+
261
+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
262
+ let callback_ref = Arc :: clone ( & callback) ;
263
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
264
+ assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
265
+
266
+ notifier. notify ( ) ;
267
+ assert ! ( callback. load( Ordering :: SeqCst ) ) ;
268
+
269
+ // Then check the case where the future is fetched before the notification, but a callback
270
+ // is only registered after the `notify`, ensuring that it is still sufficient to ensure we
271
+ // don't get an instant-wake when we get a new future.
272
+ let future = notifier. get_future ( ) ;
273
+ notifier. notify ( ) ;
274
+
275
+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
276
+ let callback_ref = Arc :: clone ( & callback) ;
277
+ future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
278
+ assert ! ( callback. load( Ordering :: SeqCst ) ) ;
279
+
280
+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
281
+ let callback_ref = Arc :: clone ( & callback) ;
282
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
283
+ assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
284
+ }
285
+
234
286
#[ cfg( feature = "std" ) ]
235
287
#[ test]
236
288
fn test_wait_timeout ( ) {
0 commit comments