2
2
3
3
use std:: fmt;
4
4
use std:: pin:: Pin ;
5
+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
5
6
use std:: thread;
7
+ use std:: time:: Duration ;
6
8
7
- use crossbeam_channel:: { unbounded , Receiver , Sender } ;
9
+ use crossbeam_channel:: { bounded , Receiver , Sender } ;
8
10
use lazy_static:: lazy_static;
9
11
10
12
use crate :: future:: Future ;
11
13
use crate :: task:: { Context , Poll } ;
12
14
use crate :: utils:: abort_on_panic;
13
15
16
+ const MAX_THREADS : u64 = 10_000 ;
17
+
18
+ static DYNAMIC_THREAD_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
19
+
14
20
struct Pool {
15
21
sender : Sender < async_task:: Task < ( ) > > ,
16
22
receiver : Receiver < async_task:: Task < ( ) > > ,
@@ -29,11 +35,69 @@ lazy_static! {
29
35
. expect( "cannot start a thread driving blocking tasks" ) ;
30
36
}
31
37
32
- let ( sender, receiver) = unbounded( ) ;
38
+ // We want to use an unbuffered channel here to help
39
+ // us drive our dynamic control. In effect, the
40
+ // kernel's scheduler becomes the queue, reducing
41
+ // the number of buffers that work must flow through
42
+ // before being acted on by a core. This helps keep
43
+ // latency snappy in the overall async system by
44
+ // reducing bufferbloat.
45
+ let ( sender, receiver) = bounded( 0 ) ;
33
46
Pool { sender, receiver }
34
47
} ;
35
48
}
36
49
50
+ // Create up to MAX_THREADS dynamic blocking task worker threads.
51
+ // Dynamic threads will terminate themselves if they don't
52
+ // receive any work after one second.
53
+ fn maybe_create_another_blocking_thread ( ) {
54
+ // We use a `Relaxed` atomic operation because
55
+ // it's just a heuristic, and would not lose correctness
56
+ // even if it's random.
57
+ let workers = DYNAMIC_THREAD_COUNT . load ( Ordering :: Relaxed ) ;
58
+ if workers >= MAX_THREADS {
59
+ return ;
60
+ }
61
+
62
+ thread:: Builder :: new ( )
63
+ . name ( "async-blocking-driver-dynamic" . to_string ( ) )
64
+ . spawn ( || {
65
+ let wait_limit = Duration :: from_secs ( 1 ) ;
66
+
67
+ DYNAMIC_THREAD_COUNT . fetch_add ( 1 , Ordering :: Relaxed ) ;
68
+ while let Ok ( task) = POOL . receiver . recv_timeout ( wait_limit) {
69
+ abort_on_panic ( || task. run ( ) ) ;
70
+ }
71
+ DYNAMIC_THREAD_COUNT . fetch_sub ( 1 , Ordering :: Relaxed ) ;
72
+ } )
73
+ . expect ( "cannot start a dynamic thread driving blocking tasks" ) ;
74
+ }
75
+
76
+ // Enqueues work, attempting to send to the threadpool in a
77
+ // nonblocking way and spinning up another worker thread if
78
+ // there is not a thread ready to accept the work.
79
+ fn schedule ( t : async_task:: Task < ( ) > ) {
80
+ let first_try_result = POOL . sender . try_send ( t) ;
81
+ match first_try_result {
82
+ Ok ( ( ) ) => {
83
+ // NICEEEE
84
+ }
85
+ Err ( crossbeam:: channel:: TrySendError :: Full ( t) ) => {
86
+ // We were not able to send to the channel without
87
+ // blocking. Try to spin up another thread and then
88
+ // retry sending while blocking.
89
+ maybe_create_another_blocking_thread ( ) ;
90
+ POOL . sender . send ( t) . unwrap ( )
91
+ }
92
+ Err ( crossbeam:: channel:: TrySendError :: Disconnected ( _) ) => {
93
+ panic ! (
94
+ "unable to send to blocking threadpool \
95
+ due to receiver disconnection"
96
+ ) ;
97
+ }
98
+ }
99
+ }
100
+
37
101
/// Spawns a blocking task.
38
102
///
39
103
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
42
106
F : Future < Output = R > + Send + ' static ,
43
107
R : Send + ' static ,
44
108
{
45
- let schedule = |t| POOL . sender . send ( t) . unwrap ( ) ;
46
109
let ( task, handle) = async_task:: spawn ( future, schedule, ( ) ) ;
47
110
task. schedule ( ) ;
48
111
JoinHandle ( handle)
0 commit comments