diff --git a/src/libextra/arc.rs b/src/libextra/arc.rs index d4bf1d480ed79..708c8e413d986 100644 --- a/src/libextra/arc.rs +++ b/src/libextra/arc.rs @@ -576,16 +576,12 @@ mod tests { let (p, c) = comm::stream(); do task::spawn() || { - let p = comm::PortSet::new(); - c.send(p.chan()); - let arc_v : Arc<~[int]> = p.recv(); let v = (*arc_v.get()).clone(); assert_eq!(v[3], 4); }; - let c = p.recv(); c.send(arc_v.clone()); assert_eq!(arc_v.get()[2], 3); diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs index 9fe6aa5795852..acdf2cee841f4 100644 --- a/src/libstd/comm.rs +++ b/src/libstd/comm.rs @@ -14,14 +14,10 @@ Message passing #[allow(missing_doc)]; -use cast::{transmute, transmute_mut}; -use container::Container; +use cast::transmute; use either::{Either, Left, Right}; use kinds::Send; -use option::{Option, Some, None}; -use uint; -use vec::OwnedVector; -use util::replace; +use option::{Option, Some}; use unstable::sync::Exclusive; use rtcomm = rt::comm; use rt; @@ -143,81 +139,6 @@ impl Selectable for Port { } } -/// Treat many ports as one. -#[unsafe_mut_field(ports)] -pub struct PortSet { - ports: ~[pipesy::Port], -} - -impl PortSet { - pub fn new() -> PortSet { - PortSet { - ports: ~[] - } - } - - pub fn add(&self, port: Port) { - let Port { inner } = port; - let port = match inner { - Left(p) => p, - Right(_) => fail!("PortSet not implemented") - }; - unsafe { - let self_ports = transmute_mut(&self.ports); - self_ports.push(port) - } - } - - pub fn chan(&self) -> Chan { - let (po, ch) = stream(); - self.add(po); - ch - } -} - -impl GenericPort for PortSet { - fn try_recv(&self) -> Option { - unsafe { - let self_ports = transmute_mut(&self.ports); - let mut result = None; - // we have to swap the ports array so we aren't borrowing - // aliasable mutable memory. - let mut ports = replace(self_ports, ~[]); - while result.is_none() && ports.len() > 0 { - let i = wait_many(ports); - match ports[i].try_recv() { - Some(m) => { - result = Some(m); - } - None => { - // Remove this port. - let _ = ports.swap_remove(i); - } - } - } - *self_ports = ports; - result - } - } - fn recv(&self) -> T { - self.try_recv().expect("port_set: endpoints closed") - } -} - -impl Peekable for PortSet { - fn peek(&self) -> bool { - // It'd be nice to use self.port.each, but that version isn't - // pure. - for uint::range(0, self.ports.len()) |i| { - let port: &pipesy::Port = &self.ports[i]; - if port.peek() { - return true; - } - } - false - } -} - /// A channel that can be shared between many senders. pub struct SharedChan { inner: Either>, rtcomm::SharedChan> diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs index 5ce5e902ed1ef..561730887edbd 100644 --- a/src/test/bench/msgsend-pipes.rs +++ b/src/test/bench/msgsend-pipes.rs @@ -16,7 +16,7 @@ extern mod extra; -use std::comm::{PortSet, Chan, stream}; +use std::comm::{SharedChan, Chan, stream}; use std::io; use std::os; use std::task; @@ -30,7 +30,7 @@ enum request { stop } -fn server(requests: &PortSet, responses: &Chan) { +fn server(requests: &Port, responses: &Chan) { let mut count: uint = 0; let mut done = false; while !done { @@ -50,9 +50,8 @@ fn server(requests: &PortSet, responses: &Chan) { fn run(args: &[~str]) { let (from_child, to_parent) = stream(); - let (from_parent_, to_child) = stream(); - let from_parent = PortSet::new(); - from_parent.add(from_parent_); + let (from_parent, to_child) = stream(); + let to_child = SharedChan::new(to_child); let size = uint::from_str(args[1]).get(); let workers = uint::from_str(args[2]).get(); @@ -60,8 +59,7 @@ fn run(args: &[~str]) { let start = extra::time::precise_time_s(); let mut worker_results = ~[]; for uint::range(0, workers) |_i| { - let (from_parent_, to_child) = stream(); - from_parent.add(from_parent_); + let to_child = to_child.clone(); let mut builder = task::task(); builder.future_result(|r| worker_results.push(r)); do builder.spawn { diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs index 6ea22715750c5..e30cd921c4e46 100644 --- a/src/test/bench/shootout-pfib.rs +++ b/src/test/bench/shootout-pfib.rs @@ -33,22 +33,24 @@ use std::u64; use std::uint; fn fib(n: int) -> int { - fn pfib(c: &Chan, n: int) { + fn pfib(c: &SharedChan, n: int) { if n == 0 { c.send(0); } else if n <= 2 { c.send(1); } else { - let p = PortSet::new(); - let ch = p.chan(); + let (pp, cc) = stream(); + let cc = SharedChan::new(cc); + let ch = cc.clone(); task::spawn(|| pfib(&ch, n - 1) ); - let ch = p.chan(); + let ch = cc.clone(); task::spawn(|| pfib(&ch, n - 2) ); - c.send(p.recv() + p.recv()); + c.send(pp.recv() + pp.recv()); } } let (p, ch) = stream(); + let ch = SharedChan::new(ch); let _t = task::spawn(|| pfib(&ch, n) ); p.recv() } diff --git a/src/test/run-pass/task-comm-14.rs b/src/test/run-pass/task-comm-14.rs index 6910d39d495ae..a04e3525d34dd 100644 --- a/src/test/run-pass/task-comm-14.rs +++ b/src/test/run-pass/task-comm-14.rs @@ -14,14 +14,14 @@ use std::comm; use std::task; pub fn main() { - let po = comm::PortSet::new(); + let (po, ch) = comm::stream(); + let ch = comm::SharedChan::new(ch); // Spawn 10 tasks each sending us back one int. let mut i = 10; while (i > 0) { info!(i); - let (p, ch) = comm::stream(); - po.add(p); + let ch = ch.clone(); task::spawn({let i = i; || child(i, &ch)}); i = i - 1; } @@ -39,7 +39,7 @@ pub fn main() { info!("main thread exiting"); } -fn child(x: int, ch: &comm::Chan) { +fn child(x: int, ch: &comm::SharedChan) { info!(x); ch.send(x); } diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs index 6c7405ef44188..ab13f8ef1796b 100644 --- a/src/test/run-pass/task-comm-3.rs +++ b/src/test/run-pass/task-comm-3.rs @@ -12,13 +12,13 @@ extern mod extra; -use std::comm::Chan; +use std::comm::SharedChan; use std::comm; use std::task; pub fn main() { info!("===== WITHOUT THREADS ====="); test00(); } -fn test00_start(ch: &Chan, message: int, count: int) { +fn test00_start(ch: &SharedChan, message: int, count: int) { info!("Starting test00_start"); let mut i: int = 0; while i < count { @@ -35,14 +35,15 @@ fn test00() { info!("Creating tasks"); - let po = comm::PortSet::new(); + let (po, ch) = comm::stream(); + let ch = comm::SharedChan::new(ch); let mut i: int = 0; // Create and spawn tasks... let mut results = ~[]; while i < number_of_tasks { - let ch = po.chan(); + let ch = ch.clone(); let mut builder = task::task(); builder.future_result(|r| results.push(r)); builder.spawn({ diff --git a/src/test/run-pass/task-comm-6.rs b/src/test/run-pass/task-comm-6.rs index db6234857d62d..0da0d5877229d 100644 --- a/src/test/run-pass/task-comm-6.rs +++ b/src/test/run-pass/task-comm-6.rs @@ -8,7 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::comm::Chan; +use std::comm::SharedChan; use std::comm; pub fn main() { test00(); } @@ -16,11 +16,12 @@ pub fn main() { test00(); } fn test00() { let mut r: int = 0; let mut sum: int = 0; - let p = comm::PortSet::new(); - let c0 = p.chan(); - let c1 = p.chan(); - let c2 = p.chan(); - let c3 = p.chan(); + let (p, ch) = comm::stream(); + let ch = SharedChan::new(ch); + let c0 = ch.clone(); + let c1 = ch.clone(); + let c2 = ch.clone(); + let c3 = ch.clone(); let number_of_messages: int = 1000; let mut i: int = 0; while i < number_of_messages { diff --git a/src/test/run-pass/task-comm-7.rs b/src/test/run-pass/task-comm-7.rs index 2ec3ff2408953..33d721f5f3806 100644 --- a/src/test/run-pass/task-comm-7.rs +++ b/src/test/run-pass/task-comm-7.rs @@ -17,7 +17,7 @@ use std::task; pub fn main() { test00(); } -fn test00_start(c: &comm::Chan, start: int, number_of_messages: int) { +fn test00_start(c: &comm::SharedChan, start: int, number_of_messages: int) { let mut i: int = 0; while i < number_of_messages { c.send(start + i); i += 1; } } @@ -25,22 +25,23 @@ fn test00_start(c: &comm::Chan, start: int, number_of_messages: int) { fn test00() { let mut r: int = 0; let mut sum: int = 0; - let p = comm::PortSet::new(); + let (p, ch) = comm::stream(); + let ch = comm::SharedChan::new(ch); let number_of_messages: int = 10; - let c = p.chan(); + let c = ch.clone(); do task::spawn || { test00_start(&c, number_of_messages * 0, number_of_messages); } - let c = p.chan(); + let c = ch.clone(); do task::spawn || { test00_start(&c, number_of_messages * 1, number_of_messages); } - let c = p.chan(); + let c = ch.clone(); do task::spawn || { test00_start(&c, number_of_messages * 2, number_of_messages); } - let c = p.chan(); + let c = ch.clone(); do task::spawn || { test00_start(&c, number_of_messages * 3, number_of_messages); } diff --git a/src/test/run-pass/task-comm-9.rs b/src/test/run-pass/task-comm-9.rs index 14c462f410cb2..86e3e24a3ee23 100644 --- a/src/test/run-pass/task-comm-9.rs +++ b/src/test/run-pass/task-comm-9.rs @@ -25,9 +25,8 @@ fn test00_start(c: &comm::Chan, number_of_messages: int) { fn test00() { let r: int = 0; let mut sum: int = 0; - let p = comm::PortSet::new(); + let (p, ch) = comm::stream(); let number_of_messages: int = 10; - let ch = p.chan(); let mut result = None; let mut builder = task::task();