diff --git a/src/libsync/lib.rs b/src/libsync/lib.rs index fa219009d41d5..0d42098d3e4f5 100644 --- a/src/libsync/lib.rs +++ b/src/libsync/lib.rs @@ -45,3 +45,4 @@ mod task_pool; pub mod raw; pub mod mutex; pub mod one; +pub mod workqueue; diff --git a/src/libsync/workqueue.rs b/src/libsync/workqueue.rs new file mode 100644 index 0000000000000..8888e9bcec108 --- /dev/null +++ b/src/libsync/workqueue.rs @@ -0,0 +1,278 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + + +/*! + +A simple worker queue. + +# Example + +```rust +use std::task; + +// Create the queue +let (queue, dispatcher) = WorkQueue::(); + +// Spawn a dispatcher thread +task::spawn(proc() { dispatcher.run(); }); + +// Spawn 10 workers +for _ in range(0, 10) { + let worker = queue.worker(); + task::spawn(proc() { worker.run(|arg| arg + 2); }); +} + +// Execute jobs on the queue +let answers: Vec = + range(0, 1000) + .map(|n| queue.push(n)) + .map(|rv| rv.recv()) + .collect(); +assert_eq!(answers, range(2, 1002).collect()); + +// The dispatcher and workers are shut down when `queue` goes out ouf scope +``` + +Jobs can be queued from other tasks: + +```rust +let proxy = queue.proxy(); +assert!(task::try(proc() { + assert_eq!(proxy.push(5).recv(), 7); +}).is_ok()); +``` + +*/ + +use std::comm::channel; + +type WorkUnit = (ARG, Sender); + +enum MessageToWorker { + Work(WorkUnit), + Halt, +} + +enum MessageToDispatcher { + Dispatch(WorkUnit), + HaltAll, + RegisterWorker(Sender>>>), +} + +/// A queue that distributes work items to worker tasks. +pub struct WorkQueue { + priv dispatcher: Sender>, +} + +/// A proxy to a `WorkQueue`. It can be freely cloned to use from multiple tasks. +pub struct Dispatcher { + priv inbox: Receiver>, +} + +/// A proxy to a `WorkQueue`. It can be freely cloned to use from multiple tasks. +pub struct WorkQueueProxy { + priv dispatcher: Sender>, +} + +/// A worker that executes tasks from its parent queue. +pub struct Worker { + priv ask_for_work: Sender>>, +} + +/// Create a new work queue. +pub fn WorkQueue() -> (WorkQueue, Dispatcher) { + let (dispatcher, dispatcher_inbox) = channel::>(); + return ( + WorkQueue{dispatcher: dispatcher}, + Dispatcher{inbox: dispatcher_inbox}, + ); +} + +impl WorkQueue { + /// Create a copyable proxy that can be used to push work units. + pub fn proxy(&self) -> WorkQueueProxy { + return WorkQueueProxy{dispatcher: self.dispatcher.clone()}; + } + + /// Create a new worker. + pub fn worker(&self) -> Worker { + let (reg_s, reg_r) = channel::>>>(); + self.dispatcher.send(RegisterWorker(reg_s)); + return Worker{ask_for_work: reg_r.recv()}; + } + + /// Push a work item to this queue. + pub fn push(&self, arg: ARG) -> Receiver { + let (rv, wait_for_rv) = channel::(); + self.dispatcher.send(Dispatch((arg, rv))); + return wait_for_rv; + } +} + + +// rustc complais "cannot implement a destructor on a structure with +// type parameters", but our destruction is safe, we only send +// a simple message on a channel. +#[unsafe_destructor] +impl Drop for WorkQueue { + fn drop(&mut self) { + self.dispatcher.send(HaltAll); + } +} + +impl Dispatcher { + /// Run the dispatcher loop. It will stop when the parent WorkQueue + /// object is dropped. + pub fn run(&self) { + let (want_work, idle_worker) = channel::>>(); + let mut worker_count = 0; + let idle_worker = idle_worker; + loop { + match self.inbox.recv() { + Dispatch(work_item) => { + idle_worker.recv().send(Work(work_item)); + }, + RegisterWorker(want_idle_sender) => { + worker_count += 1; + want_idle_sender.send(want_work.clone()); + } + HaltAll => { + while worker_count > 0 { + idle_worker.recv().send(Halt); + worker_count -= 1; + } + return; + }, + }; + } + } +} + +impl WorkQueueProxy { + /// Push a work item to this queue. + pub fn push(&self, arg: ARG) -> Receiver { + let (rv, wait_for_rv) = channel::(); + self.dispatcher.send(Dispatch((arg, rv))); + return wait_for_rv; + } +} + +impl Clone for WorkQueueProxy { + fn clone(&self) -> WorkQueueProxy { + return WorkQueueProxy{dispatcher: self.dispatcher.clone()}; + } +} + +impl Worker { + /// Run the worker loop. It will stop when the parente WorkQueue + /// object is dropped. + pub fn run(&self, fun: |arg: ARG| -> RV) { + loop { + let (idle, work_unit) = channel::>(); + self.ask_for_work.send(idle); + match work_unit.recv() { + Work((arg, rv)) => rv.send((fun)(arg)), + Halt => return + }; + } + } +} + +#[cfg(test)] +mod test { + use std::task::spawn; + use super::WorkQueue; + + #[test] + fn test_queue() { + let (queue, dispatcher) = WorkQueue::(); + spawn(proc() { dispatcher.run() }); + for _ in range(0, 3) { + let worker = queue.worker(); + spawn(proc() { worker.run(|arg| arg * 2); }); + } + + let return_list: Vec = + range(0, 10) + .map(|c| queue.push(c)) + .map(|rv| rv.recv()) + .collect(); + + assert_eq!(return_list, vec!(0, 2, 4, 6, 8, 10, 12, 14, 16, 18)); + } + + #[test] + fn test_enqueue_from_tasks() { + let (queue, dispatcher) = WorkQueue::(); + spawn(proc() { dispatcher.run() }); + for _ in range(0, 3) { + let worker = queue.worker(); + spawn(proc() { worker.run(|arg| arg * 2); }); + } + let mut promise_list: Vec> = Vec::new(); + let queue_proxy = queue.proxy(); + for c in range(0, 10) { + let queue_proxy_clone = queue_proxy.clone(); + let (done, promise) = channel::(); + promise_list.push(promise); + spawn(proc() { + let done = done; + let queue = queue_proxy_clone; + let rv = queue.push(c); + done.send(rv.recv()); + }); + } + + let return_list: Vec = + promise_list + .iter() + .map(|promise| promise.recv()) + .collect(); + assert_eq!(return_list, vec!(0, 2, 4, 6, 8, 10, 12, 14, 16, 18)); + } +} + +#[cfg(test)] +mod bench { + extern crate test; + + use self::test::BenchHarness; + use super::WorkQueue; + + #[bench] + fn bench_50_tasks_4_threads(b: &mut BenchHarness) { + let (queue, dispatcher) = WorkQueue::(); + spawn(proc() { dispatcher.run() }); + for _ in range(0, 4) { + let worker = queue.worker(); + spawn(proc() { worker.run(|arg| arg * 2); }); + } + b.iter(|| { + let _: Vec = + range(0, 50) + .map(|_| queue.push(1)) + .map(|rv| rv.recv()) + .collect(); + }); + } + + #[bench] + fn bench_spawn_5_workers(b: &mut BenchHarness) { + b.iter(|| { + let (queue, dispatcher) = WorkQueue::(); + spawn(proc() { dispatcher.run() }); + for _ in range(0, 5) { + let worker = queue.worker(); + spawn(proc() { worker.run(|arg| arg * 2); }); + } + }); + } +}