Skip to content

Commit f34fadd

Browse files
committed
Implement select() for new runtime pipes.
1 parent 7326bc8 commit f34fadd

File tree

5 files changed

+310
-48
lines changed

5 files changed

+310
-48
lines changed

src/libstd/rt/comm.rs

Lines changed: 134 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
1313
use option::*;
1414
use cast;
15-
use util;
1615
use ops::Drop;
1716
use rt::kill::BlockedTask;
1817
use kinds::Send;
1918
use rt::sched::Scheduler;
2019
use rt::local::Local;
21-
use unstable::atomics::{AtomicUint, AtomicOption, Acquire, SeqCst};
20+
use rt::select::{Select, SelectPort};
21+
use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Release, SeqCst};
2222
use unstable::sync::UnsafeAtomicRcBox;
2323
use util::Void;
2424
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
@@ -76,6 +76,7 @@ pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
7676
}
7777

7878
impl<T> ChanOne<T> {
79+
#[inline]
7980
fn packet(&self) -> *mut Packet<T> {
8081
unsafe {
8182
let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
@@ -141,7 +142,6 @@ impl<T> ChanOne<T> {
141142
}
142143
}
143144

144-
145145
impl<T> PortOne<T> {
146146
fn packet(&self) -> *mut Packet<T> {
147147
unsafe {
@@ -162,46 +162,115 @@ impl<T> PortOne<T> {
162162

163163
pub fn try_recv(self) -> Option<T> {
164164
let mut this = self;
165-
let packet = this.packet();
166165

167166
// Optimistic check. If data was sent already, we don't even need to block.
168167
// No release barrier needed here; we're not handing off our task pointer yet.
169-
if unsafe { (*packet).state.load(Acquire) } != STATE_ONE {
168+
if !this.optimistic_check() {
170169
// No data available yet.
171170
// Switch to the scheduler to put the ~Task into the Packet state.
172171
let sched = Local::take::<Scheduler>();
173172
do sched.deschedule_running_task_and_then |sched, task| {
174-
unsafe {
175-
// Atomically swap the task pointer into the Packet state, issuing
176-
// an acquire barrier to prevent reordering of the subsequent read
177-
// of the payload. Also issues a release barrier to prevent
178-
// reordering of any previous writes to the task structure.
179-
let task_as_state = task.cast_to_uint();
180-
let oldstate = (*packet).state.swap(task_as_state, SeqCst);
181-
match oldstate {
182-
STATE_BOTH => {
183-
// Data has not been sent. Now we're blocked.
184-
rtdebug!("non-rendezvous recv");
185-
sched.metrics.non_rendezvous_recvs += 1;
186-
}
187-
STATE_ONE => {
188-
rtdebug!("rendezvous recv");
189-
sched.metrics.rendezvous_recvs += 1;
190-
191-
// Channel is closed. Switch back and check the data.
192-
// NB: We have to drop back into the scheduler event loop here
193-
// instead of switching immediately back or we could end up
194-
// triggering infinite recursion on the scheduler's stack.
195-
let recvr = BlockedTask::cast_from_uint(task_as_state);
196-
sched.enqueue_blocked_task(recvr);
173+
this.block_on(sched, task);
174+
}
175+
}
176+
177+
// Task resumes.
178+
this.recv_ready()
179+
}
180+
}
181+
182+
impl<T> Select for PortOne<T> {
183+
#[inline]
184+
fn optimistic_check(&mut self) -> bool {
185+
unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
186+
}
187+
188+
fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
189+
unsafe {
190+
// Atomically swap the task pointer into the Packet state, issuing
191+
// an acquire barrier to prevent reordering of the subsequent read
192+
// of the payload. Also issues a release barrier to prevent
193+
// reordering of any previous writes to the task structure.
194+
let task_as_state = task.cast_to_uint();
195+
let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
196+
match oldstate {
197+
STATE_BOTH => {
198+
// Data has not been sent. Now we're blocked.
199+
rtdebug!("non-rendezvous recv");
200+
sched.metrics.non_rendezvous_recvs += 1;
201+
false
202+
}
203+
STATE_ONE => {
204+
// Re-record that we are the only owner of the packet.
205+
// Release barrier needed in case the task gets reawoken
206+
// on a different core (this is analogous to writing a
207+
// payload; a barrier in enqueueing the task protects it).
208+
// NB(#8132). This *must* occur before the enqueue below.
209+
// FIXME(#6842, #8130) This is usually only needed for the
210+
// assertion in recv_ready, except in the case of select().
211+
// This won't actually ever have cacheline contention, but
212+
// maybe should be optimized out with a cfg(test) anyway?
213+
(*self.packet()).state.store(STATE_ONE, Release);
214+
215+
rtdebug!("rendezvous recv");
216+
sched.metrics.rendezvous_recvs += 1;
217+
218+
// Channel is closed. Switch back and check the data.
219+
// NB: We have to drop back into the scheduler event loop here
220+
// instead of switching immediately back or we could end up
221+
// triggering infinite recursion on the scheduler's stack.
222+
let recvr = BlockedTask::cast_from_uint(task_as_state);
223+
sched.enqueue_blocked_task(recvr);
224+
true
225+
}
226+
_ => rtabort!("can't block_on; a task is already blocked")
227+
}
228+
}
229+
}
230+
231+
// This is the only select trait function that's not also used in recv.
232+
fn unblock_from(&mut self) -> bool {
233+
let packet = self.packet();
234+
unsafe {
235+
// In case the data is available, the acquire barrier here matches
236+
// the release barrier the sender used to release the payload.
237+
match (*packet).state.load(Acquire) {
238+
// Impossible. We removed STATE_BOTH when blocking on it, and
239+
// no self-respecting sender would put it back.
240+
STATE_BOTH => rtabort!("refcount already 2 in unblock_from"),
241+
// Here, a sender already tried to wake us up. Perhaps they
242+
// even succeeded! Data is available.
243+
STATE_ONE => true,
244+
// Still registered as blocked. Need to "unblock" the pointer.
245+
task_as_state => {
246+
// In the window between the load and the CAS, a sender
247+
// might take the pointer and set the refcount to ONE. If
248+
// that happens, we shouldn't clobber that with BOTH!
249+
// Acquire barrier again for the same reason as above.
250+
match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
251+
Acquire) {
252+
STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
253+
STATE_ONE => true, // Lost the race. Data available.
254+
same_ptr => {
255+
// We successfully unblocked our task pointer.
256+
assert!(task_as_state == same_ptr);
257+
let handle = BlockedTask::cast_from_uint(task_as_state);
258+
// Because we are already awake, the handle we
259+
// gave to this port shall already be empty.
260+
handle.assert_already_awake();
261+
false
197262
}
198-
_ => util::unreachable()
199263
}
200264
}
201265
}
202266
}
267+
}
268+
}
203269

204-
// Task resumes.
270+
impl<T> SelectPort<T> for PortOne<T> {
271+
fn recv_ready(self) -> Option<T> {
272+
let mut this = self;
273+
let packet = this.packet();
205274

206275
// No further memory barrier is needed here to access the
207276
// payload. Some scenarios:
@@ -213,8 +282,11 @@ impl<T> PortOne<T> {
213282
// 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
214283
// is pinned to some other scheduler, so the sending task had to give us to
215284
// a different scheduler for resuming. That send synchronized memory.
216-
217285
unsafe {
286+
// See corresponding store() above in block_on for rationale.
287+
// FIXME(#8130) This can happen only in test builds.
288+
assert!((*packet).state.load(Acquire) == STATE_ONE);
289+
218290
let payload = (*packet).payload.take();
219291

220292
// The sender has closed up shop. Drop the packet.
@@ -234,7 +306,7 @@ impl<T> Peekable<T> for PortOne<T> {
234306
match oldstate {
235307
STATE_BOTH => false,
236308
STATE_ONE => (*packet).payload.is_some(),
237-
_ => util::unreachable()
309+
_ => rtabort!("peeked on a blocked task")
238310
}
239311
}
240312
}
@@ -368,6 +440,36 @@ impl<T> Peekable<T> for Port<T> {
368440
}
369441
}
370442

443+
impl<T> Select for Port<T> {
444+
#[inline]
445+
fn optimistic_check(&mut self) -> bool {
446+
do self.next.with_mut_ref |pone| { pone.optimistic_check() }
447+
}
448+
449+
#[inline]
450+
fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
451+
let task = Cell::new(task);
452+
do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
453+
}
454+
455+
#[inline]
456+
fn unblock_from(&mut self) -> bool {
457+
do self.next.with_mut_ref |pone| { pone.unblock_from() }
458+
}
459+
}
460+
461+
impl<T> SelectPort<(T, Port<T>)> for Port<T> {
462+
fn recv_ready(self) -> Option<(T, Port<T>)> {
463+
match self.next.take().recv_ready() {
464+
Some(StreamPayload { val, next }) => {
465+
self.next.put_back(next);
466+
Some((val, self))
467+
}
468+
None => None
469+
}
470+
}
471+
}
472+
371473
pub struct SharedChan<T> {
372474
// Just like Chan, but a shared AtomicOption instead of Cell
373475
priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>

src/libstd/rt/kill.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,14 @@ impl Drop for KillFlag {
106106
// blocked task handle. So unblocking a task must restore that spare.
107107
unsafe fn revive_task_ptr(task_ptr: uint, spare_flag: Option<KillFlagHandle>) -> ~Task {
108108
let mut task: ~Task = cast::transmute(task_ptr);
109-
rtassert!(task.death.spare_kill_flag.is_none());
110-
task.death.spare_kill_flag = spare_flag;
109+
if task.death.spare_kill_flag.is_none() {
110+
task.death.spare_kill_flag = spare_flag;
111+
} else {
112+
// A task's spare kill flag is not used for blocking in one case:
113+
// when an unkillable task blocks on select. In this case, a separate
114+
// one was created, which we now discard.
115+
rtassert!(task.death.unkillable > 0);
116+
}
111117
task
112118
}
113119

@@ -119,7 +125,7 @@ impl BlockedTask {
119125
Killable(flag_arc) => {
120126
let flag = unsafe { &mut **flag_arc.get() };
121127
match flag.swap(KILL_RUNNING, SeqCst) {
122-
KILL_RUNNING => rtabort!("tried to wake an already-running task"),
128+
KILL_RUNNING => None, // woken from select(), perhaps
123129
KILL_KILLED => None, // a killer stole it already
124130
task_ptr =>
125131
Some(unsafe { revive_task_ptr(task_ptr, Some(flag_arc)) })
@@ -162,6 +168,27 @@ impl BlockedTask {
162168
}
163169
}
164170

171+
/// Converts one blocked task handle to a list of many handles to the same.
172+
pub fn make_selectable(self, num_handles: uint) -> ~[BlockedTask] {
173+
let handles = match self {
174+
Unkillable(task) => {
175+
let flag = unsafe { KillFlag(AtomicUint::new(cast::transmute(task))) };
176+
UnsafeAtomicRcBox::newN(flag, num_handles)
177+
}
178+
Killable(flag_arc) => flag_arc.cloneN(num_handles),
179+
};
180+
// Even if the task was unkillable before, we use 'Killable' because
181+
// multiple pipes will have handles. It does not really mean killable.
182+
handles.consume_iter().transform(|x| Killable(x)).collect()
183+
}
184+
185+
// This assertion has two flavours because the wake involves an atomic op.
186+
// In the faster version, destructors will fail dramatically instead.
187+
#[inline] #[cfg(not(test))]
188+
pub fn assert_already_awake(self) { }
189+
#[inline] #[cfg(test)]
190+
pub fn assert_already_awake(self) { assert!(self.wake().is_none()); }
191+
165192
/// Convert to an unsafe uint value. Useful for storing in a pipe's state flag.
166193
#[inline]
167194
pub unsafe fn cast_to_uint(self) -> uint {

src/libstd/rt/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ pub mod tube;
142142
/// Simple reimplementation of core::comm
143143
pub mod comm;
144144

145+
/// Routines for select()ing on pipes.
146+
pub mod select;
147+
145148
// FIXME #5248 shouldn't be pub
146149
/// The runtime needs to be able to put a pointer into thread-local storage.
147150
pub mod local_ptr;

0 commit comments

Comments
 (0)