Skip to content

Commit 51d257f

Browse files
committed
core::rt: Add SharedPort
1 parent 422f663 commit 51d257f

File tree

1 file changed

+132
-0
lines changed

1 file changed

+132
-0
lines changed

src/libstd/rt/comm.rs

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,61 @@ impl<T> Clone for SharedChan<T> {
416416
}
417417
}
418418

419+
pub struct SharedPort<T> {
420+
// The next port on which we will receive the next port on which we will receive T
421+
priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>>
422+
}
423+
424+
impl<T> SharedPort<T> {
425+
pub fn new(port: Port<T>) -> SharedPort<T> {
426+
// Put the data port into a new link pipe
427+
let next_data_port = port.next.take();
428+
let (next_link_port, next_link_chan) = oneshot();
429+
next_link_chan.send(next_data_port);
430+
let next_link = AtomicOption::new(~next_link_port);
431+
SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) }
432+
}
433+
}
434+
435+
impl<T: Owned> GenericPort<T> for SharedPort<T> {
436+
fn recv(&self) -> T {
437+
match self.try_recv() {
438+
Some(val) => val,
439+
None => {
440+
fail!("receiving on a closed channel");
441+
}
442+
}
443+
}
444+
445+
fn try_recv(&self) -> Option<T> {
446+
unsafe {
447+
let (next_link_port, next_link_chan) = oneshot();
448+
let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
449+
let link_port = link_port.unwrap();
450+
let data_port = link_port.recv();
451+
let (next_data_port, res) = match data_port.try_recv() {
452+
Some(StreamPayload { val, next }) => {
453+
(next, Some(val))
454+
}
455+
None => {
456+
let (next_data_port, _) = oneshot();
457+
(next_data_port, None)
458+
}
459+
};
460+
next_link_chan.send(next_data_port);
461+
return res;
462+
}
463+
}
464+
}
465+
466+
impl<T> Clone for SharedPort<T> {
467+
fn clone(&self) -> SharedPort<T> {
468+
SharedPort {
469+
next_link: self.next_link.clone()
470+
}
471+
}
472+
}
473+
419474
#[cfg(test)]
420475
mod test {
421476
use super::*;
@@ -702,5 +757,82 @@ mod test {
702757
}
703758
}
704759
}
760+
761+
#[test]
762+
fn shared_port_stress() {
763+
do run_in_mt_newsched_task {
764+
// XXX: Removing these type annotations causes an ICE
765+
let (end_port, end_chan) = stream::<()>();
766+
let (port, chan) = stream::<()>();
767+
let end_chan = SharedChan::new(end_chan);
768+
let port = SharedPort::new(port);
769+
let total = stress_factor() + 100;
770+
for total.times {
771+
let end_chan_clone = end_chan.clone();
772+
let port_clone = port.clone();
773+
do spawntask_random {
774+
port_clone.recv();
775+
end_chan_clone.send(());
776+
}
777+
}
778+
779+
for total.times {
780+
chan.send(());
781+
}
782+
783+
for total.times {
784+
end_port.recv();
785+
}
786+
}
787+
}
788+
789+
#[test]
790+
fn shared_port_close_simple() {
791+
do run_in_mt_newsched_task {
792+
let (port, chan) = stream::<()>();
793+
let port = SharedPort::new(port);
794+
{ let _chan = chan; }
795+
assert!(port.try_recv().is_none());
796+
}
797+
}
798+
799+
#[test]
800+
fn shared_port_close() {
801+
do run_in_mt_newsched_task {
802+
let (end_port, end_chan) = stream::<bool>();
803+
let (port, chan) = stream::<()>();
804+
let end_chan = SharedChan::new(end_chan);
805+
let port = SharedPort::new(port);
806+
let chan = SharedChan::new(chan);
807+
let send_total = 10;
808+
let recv_total = 20;
809+
do spawntask_random {
810+
for send_total.times {
811+
let chan_clone = chan.clone();
812+
do spawntask_random {
813+
chan_clone.send(());
814+
}
815+
}
816+
}
817+
let end_chan_clone = end_chan.clone();
818+
do spawntask_random {
819+
for recv_total.times {
820+
let port_clone = port.clone();
821+
let end_chan_clone = end_chan_clone.clone();
822+
do spawntask_random {
823+
let recvd = port_clone.try_recv().is_some();
824+
end_chan_clone.send(recvd);
825+
}
826+
}
827+
}
828+
829+
let mut recvd = 0;
830+
for recv_total.times {
831+
recvd += if end_port.recv() { 1 } else { 0 };
832+
}
833+
834+
assert!(recvd == send_total);
835+
}
836+
}
705837
}
706838

0 commit comments

Comments
 (0)