@@ -473,6 +473,7 @@ impl Source {
473
473
// Register the current task's waker if not present already.
474
474
if w. readers . iter ( ) . all ( |w| !w. will_wake ( cx. waker ( ) ) ) {
475
475
w. readers . push ( cx. waker ( ) . clone ( ) ) ;
476
+ limit_waker_list ( & mut w. readers ) ;
476
477
}
477
478
478
479
// Remember the current ticks.
@@ -526,6 +527,7 @@ impl Source {
526
527
// Register the current task's waker if not present already.
527
528
if w. writers . iter ( ) . all ( |w| !w. will_wake ( cx. waker ( ) ) ) {
528
529
w. writers . push ( cx. waker ( ) . clone ( ) ) ;
530
+ limit_waker_list ( & mut w. writers ) ;
529
531
}
530
532
531
533
// Remember the current ticks.
@@ -546,3 +548,25 @@ impl Source {
546
548
== Self :: WRITERS_REGISTERED
547
549
}
548
550
}
551
+
552
+ /// Wakes up all wakers in the list if it grew too big.
553
+ ///
554
+ /// The waker list keeps growing in pathological cases where a single async I/O handle has lots of
555
+ /// different reader or writer tasks. If the number of interested wakers crosses some threshold, we
556
+ /// clear the list and wake all of them at once.
557
+ ///
558
+ /// This strategy prevents memory leaks by bounding the number of stored wakers. However, since all
559
+ /// wakers get woken, tasks might simply re-register their interest again, thus creating an infinite
560
+ /// loop and burning CPU cycles forever.
561
+ ///
562
+ /// However, we don't worry about such scenarios because it's very unlikely to have more than two
563
+ /// actually concurrent tasks operating a single async I/O handle. If we happen to cross the
564
+ /// aforementioned threshold, we have bigger problems to worry about.
565
+ fn limit_waker_list ( wakers : & mut Vec < Waker > ) {
566
+ if wakers. len ( ) > 50 {
567
+ for waker in wakers. drain ( ..) {
568
+ // Don't let a panicking waker blow everything up.
569
+ let _ = panic:: catch_unwind ( || waker. wake ( ) ) ;
570
+ }
571
+ }
572
+ }
0 commit comments