@@ -11,6 +11,7 @@ use crossbeam_utils::thread::scope;
11
11
use once_cell:: unsync:: OnceCell ;
12
12
13
13
use crate :: rt:: Reactor ;
14
+ use crate :: sync:: Spinlock ;
14
15
use crate :: task:: Runnable ;
15
16
use crate :: utils:: { abort_on_panic, random} ;
16
17
@@ -58,7 +59,7 @@ impl Runtime {
58
59
59
60
let stealers = machines
60
61
. iter ( )
61
- . map ( |m| m. processor . worker . stealer ( ) )
62
+ . map ( |m| m. processor . lock ( ) . worker . stealer ( ) )
62
63
. collect ( ) ;
63
64
64
65
Runtime {
@@ -138,33 +139,32 @@ impl Runtime {
138
139
/// A thread running a processor.
139
140
struct Machine {
140
141
/// Holds the processor until it gets stolen.
141
- processor : Processor ,
142
+ processor : Spinlock < Processor > ,
142
143
}
143
144
144
- unsafe impl Send for Machine { }
145
- unsafe impl Sync for Machine { }
146
-
147
145
impl Machine {
148
146
/// Creates a new machine running a processor.
149
147
fn new ( p : Processor ) -> Machine {
150
- Machine { processor : p }
148
+ Machine {
149
+ processor : Spinlock :: new ( p) ,
150
+ }
151
151
}
152
152
153
153
/// Schedules a task onto the machine.
154
154
fn schedule ( & self , rt : & Runtime , task : Runnable ) {
155
- self . processor . schedule ( rt, task) ;
155
+ self . processor . lock ( ) . schedule ( rt, task) ;
156
156
}
157
157
158
158
/// Finds the next runnable task.
159
159
fn find_task ( & self , rt : & Runtime ) -> Steal < Runnable > {
160
160
let mut retry = false ;
161
161
162
162
// First try finding a task in the local queue or in the global queue.
163
- if let Some ( task) = self . processor . pop_task ( ) {
163
+ if let Some ( task) = self . processor . lock ( ) . pop_task ( ) {
164
164
return Steal :: Success ( task) ;
165
165
}
166
166
167
- match self . processor . steal_from_global ( rt) {
167
+ match self . processor . lock ( ) . steal_from_global ( rt) {
168
168
Steal :: Empty => { }
169
169
Steal :: Retry => retry = true ,
170
170
Steal :: Success ( task) => return Steal :: Success ( task) ,
@@ -176,12 +176,12 @@ impl Machine {
176
176
// Try finding a task in the local queue, which might hold tasks woken by the reactor. If
177
177
// the local queue is still empty, try stealing from other processors.
178
178
if progress {
179
- if let Some ( task) = self . processor . pop_task ( ) {
179
+ if let Some ( task) = self . processor . lock ( ) . pop_task ( ) {
180
180
return Steal :: Success ( task) ;
181
181
}
182
182
}
183
183
184
- match self . processor . steal_from_others ( rt) {
184
+ match self . processor . lock ( ) . steal_from_others ( rt) {
185
185
Steal :: Empty => { }
186
186
Steal :: Retry => retry = true ,
187
187
Steal :: Success ( task) => return Steal :: Success ( task) ,
@@ -208,7 +208,7 @@ impl Machine {
208
208
// Check if `task::yield_now()` was invoked and flush the slot if so.
209
209
YIELD_NOW . with ( |flag| {
210
210
if flag. replace ( false ) {
211
- self . processor . flush_slot ( rt) ;
211
+ self . processor . lock ( ) . flush_slot ( rt) ;
212
212
}
213
213
} ) ;
214
214
@@ -219,11 +219,12 @@ impl Machine {
219
219
runs = 0 ;
220
220
rt. quick_poll ( ) . unwrap ( ) ;
221
221
222
- if let Steal :: Success ( task) = self . processor . steal_from_global ( rt) {
223
- self . processor . schedule ( rt, task) ;
222
+ let p = self . processor . lock ( ) ;
223
+ if let Steal :: Success ( task) = p. steal_from_global ( rt) {
224
+ p. schedule ( rt, task) ;
224
225
}
225
226
226
- self . processor . flush_slot ( rt) ;
227
+ p . flush_slot ( rt) ;
227
228
}
228
229
229
230
// Try to find a runnable task.
0 commit comments