Skip to content

Commit 422f663

Browse files
committed
core::rt: Implement SharedChan
1 parent 1507df8 commit 422f663

File tree

1 file changed

+64
-3
lines changed

1 file changed

+64
-3
lines changed

src/libstd/rt/comm.rs

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ use ops::Drop;
2222
use kinds::Owned;
2323
use rt::sched::{Scheduler, Coroutine};
2424
use rt::local::Local;
25-
use unstable::atomics::{AtomicUint, SeqCst};
25+
use unstable::atomics::{AtomicUint, AtomicOption, SeqCst};
26+
use unstable::sync::UnsafeAtomicRcBox;
2627
use util::Void;
2728
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
2829
use cell::Cell;
30+
use clone::Clone;
2931

3032
/// A combined refcount / ~Task pointer.
3133
///
@@ -312,16 +314,19 @@ struct StreamPayload<T> {
312314
next: PortOne<StreamPayload<T>>
313315
}
314316

317+
type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
318+
type StreamPortOne<T> = PortOne<StreamPayload<T>>;
319+
315320
/// A channel with unbounded size.
316321
pub struct Chan<T> {
317322
// FIXME #5372. Using Cell because we don't take &mut self
318-
next: Cell<ChanOne<StreamPayload<T>>>
323+
next: Cell<StreamChanOne<T>>
319324
}
320325

321326
/// An port with unbounded size.
322327
pub struct Port<T> {
323328
// FIXME #5372. Using Cell because we don't take &mut self
324-
next: Cell<PortOne<StreamPayload<T>>>
329+
next: Cell<StreamPortOne<T>>
325330
}
326331

327332
pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) {
@@ -374,6 +379,43 @@ impl<T> Peekable<T> for Port<T> {
374379
}
375380
}
376381

382+
pub struct SharedChan<T> {
383+
// Just like Chan, but a shared AtomicOption instead of Cell
384+
priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
385+
}
386+
387+
impl<T> SharedChan<T> {
388+
pub fn new(chan: Chan<T>) -> SharedChan<T> {
389+
let next = chan.next.take();
390+
let next = AtomicOption::new(~next);
391+
SharedChan { next: UnsafeAtomicRcBox::new(next) }
392+
}
393+
}
394+
395+
impl<T: Owned> GenericChan<T> for SharedChan<T> {
396+
fn send(&self, val: T) {
397+
self.try_send(val);
398+
}
399+
}
400+
401+
impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
402+
fn try_send(&self, val: T) -> bool {
403+
unsafe {
404+
let (next_pone, next_cone) = oneshot();
405+
let cone = (*self.next.get()).swap(~next_cone, SeqCst);
406+
cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
407+
}
408+
}
409+
}
410+
411+
impl<T> Clone for SharedChan<T> {
412+
fn clone(&self) -> SharedChan<T> {
413+
SharedChan {
414+
next: self.next.clone()
415+
}
416+
}
417+
}
418+
377419
#[cfg(test)]
378420
mod test {
379421
use super::*;
@@ -641,5 +683,24 @@ mod test {
641683
for 10000.times { port.recv() }
642684
}
643685
}
686+
687+
#[test]
688+
fn shared_chan_stress() {
689+
do run_in_mt_newsched_task {
690+
let (port, chan) = stream();
691+
let chan = SharedChan::new(chan);
692+
let total = stress_factor() + 100;
693+
for total.times {
694+
let chan_clone = chan.clone();
695+
do spawntask_random {
696+
chan_clone.send(());
697+
}
698+
}
699+
700+
for total.times {
701+
port.recv();
702+
}
703+
}
704+
}
644705
}
645706

0 commit comments

Comments
 (0)