Skip to content

Commit d234cf7

Browse files
committed
core::rt: Make TCP servers work
1 parent 414f3c7 commit d234cf7

File tree

3 files changed

+161
-34
lines changed

3 files changed

+161
-34
lines changed

src/libcore/rt/io/net/tcp.rs

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl Writer for TcpStream {
7979
}
8080

8181
pub struct TcpListener {
82-
rtlistener: ~RtioTcpListenerObject
82+
rtlistener: ~RtioTcpListenerObject,
8383
}
8484

8585
impl TcpListener {
@@ -116,6 +116,8 @@ impl Listener<TcpStream> for TcpListener {
116116
#[cfg(test)]
117117
mod test {
118118
use super::*;
119+
use int;
120+
use cell::Cell;
119121
use rt::test::*;
120122
use rt::io::net::ip::Ipv4;
121123
use rt::io::*;
@@ -172,11 +174,11 @@ mod test {
172174
}
173175
}
174176

175-
#[test] #[ignore]
177+
#[test]
176178
fn multiple_connect_serial() {
177179
do run_in_newsched_task {
178180
let addr = next_test_ip4();
179-
let max = 100;
181+
let max = 10;
180182

181183
do spawntask_immediately {
182184
let mut listener = TcpListener::bind(addr);
@@ -197,4 +199,82 @@ mod test {
197199
}
198200
}
199201

202+
#[test]
203+
fn multiple_connect_interleaved_greedy_schedule() {
204+
do run_in_newsched_task {
205+
let addr = next_test_ip4();
206+
static MAX: int = 10;
207+
208+
do spawntask_immediately {
209+
let mut listener = TcpListener::bind(addr);
210+
for int::range(0, MAX) |i| {
211+
let stream = Cell(listener.accept());
212+
rtdebug!("accepted");
213+
// Start another task to handle the connection
214+
do spawntask_immediately {
215+
let mut stream = stream.take();
216+
let mut buf = [0];
217+
stream.read(buf);
218+
assert!(buf[0] == i as u8);
219+
rtdebug!("read");
220+
}
221+
}
222+
}
223+
224+
connect(0, addr);
225+
226+
fn connect(i: int, addr: IpAddr) {
227+
if i == MAX { return }
228+
229+
do spawntask_immediately {
230+
rtdebug!("connecting");
231+
let mut stream = TcpStream::connect(addr);
232+
// Connect again before writing
233+
connect(i + 1, addr);
234+
rtdebug!("writing");
235+
stream.write([i as u8]);
236+
}
237+
}
238+
}
239+
}
240+
241+
#[test]
242+
fn multiple_connect_interleaved_lazy_schedule() {
243+
do run_in_newsched_task {
244+
let addr = next_test_ip4();
245+
static MAX: int = 10;
246+
247+
do spawntask_immediately {
248+
let mut listener = TcpListener::bind(addr);
249+
for int::range(0, MAX) |_| {
250+
let stream = Cell(listener.accept());
251+
rtdebug!("accepted");
252+
// Start another task to handle the connection
253+
do spawntask_later {
254+
let mut stream = stream.take();
255+
let mut buf = [0];
256+
stream.read(buf);
257+
assert!(buf[0] == 99);
258+
rtdebug!("read");
259+
}
260+
}
261+
}
262+
263+
connect(0, addr);
264+
265+
fn connect(i: int, addr: IpAddr) {
266+
if i == MAX { return }
267+
268+
do spawntask_later {
269+
rtdebug!("connecting");
270+
let mut stream = TcpStream::connect(addr);
271+
// Connect again before writing
272+
connect(i + 1, addr);
273+
rtdebug!("writing");
274+
stream.write([99]);
275+
}
276+
}
277+
}
278+
}
279+
200280
}

src/libcore/rt/test.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,46 @@ pub fn spawntask_immediately(f: ~fn()) {
6464
}
6565
}
6666

67+
/// Create a new task and run it right now. Aborts on failure
68+
pub fn spawntask_later(f: ~fn()) {
69+
use super::sched::*;
70+
71+
let mut sched = local_sched::take();
72+
let task = ~Task::with_local(&mut sched.stack_pool,
73+
LocalServices::without_unwinding(),
74+
f);
75+
76+
sched.task_queue.push_front(task);
77+
local_sched::put(sched);
78+
}
79+
80+
/// Spawn a task and either run it immediately or run it later
81+
pub fn spawntask_random(f: ~fn()) {
82+
use super::sched::*;
83+
use rand::{Rand, rng};
84+
85+
let mut rng = rng();
86+
let run_now: bool = Rand::rand(&mut rng);
87+
88+
let mut sched = local_sched::take();
89+
let task = ~Task::with_local(&mut sched.stack_pool,
90+
LocalServices::without_unwinding(),
91+
f);
92+
93+
if run_now {
94+
do sched.switch_running_tasks_and_then(task) |task| {
95+
let task = Cell(task);
96+
do local_sched::borrow |sched| {
97+
sched.task_queue.push_front(task.take());
98+
}
99+
}
100+
} else {
101+
sched.task_queue.push_front(task);
102+
local_sched::put(sched);
103+
}
104+
}
105+
106+
67107
/// Spawn a task and wait for it to finish, returning whether it completed successfully or failed
68108
pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
69109
use cell::Cell;

src/libcore/rt/uv/uvio.rs

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use rt::uv::idle::IdleWatcher;
2121
use rt::rtio::*;
2222
use rt::sched::{Scheduler, local_sched};
2323
use rt::io::{standard_error, OtherIoError};
24+
use rt::tube::Tube;
2425

2526
#[cfg(test)] use uint;
2627
#[cfg(test)] use unstable::run_in_bare_thread;
@@ -149,7 +150,7 @@ impl IoFactory for UvIoFactory {
149150
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
150151
let mut watcher = TcpWatcher::new(self.uv_loop());
151152
match watcher.bind(addr) {
152-
Ok(_) => Ok(~UvTcpListener { watcher: watcher }),
153+
Ok(_) => Ok(~UvTcpListener::new(watcher)),
153154
Err(uverr) => {
154155
// XXX: Should we wait until close completes?
155156
watcher.as_stream().close(||());
@@ -161,10 +162,20 @@ impl IoFactory for UvIoFactory {
161162

162163
// FIXME #6090: Prefer newtype structs but Drop doesn't work
163164
pub struct UvTcpListener {
164-
watcher: TcpWatcher
165+
watcher: TcpWatcher,
166+
listening: bool,
167+
incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
165168
}
166169

167170
impl UvTcpListener {
171+
fn new(watcher: TcpWatcher) -> UvTcpListener {
172+
UvTcpListener {
173+
watcher: watcher,
174+
listening: false,
175+
incoming_streams: Tube::new()
176+
}
177+
}
178+
168179
fn watcher(&self) -> TcpWatcher { self.watcher }
169180
}
170181

@@ -179,41 +190,37 @@ impl RtioTcpListener for UvTcpListener {
179190

180191
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
181192
rtdebug!("entering listen");
182-
let result_cell = empty_cell();
183-
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
184193

185-
let server_tcp_watcher = self.watcher();
194+
if self.listening {
195+
return self.incoming_streams.recv();
196+
}
186197

187-
let scheduler = local_sched::take();
188-
assert!(scheduler.in_task_context());
198+
self.listening = true;
189199

190-
do scheduler.deschedule_running_task_and_then |task| {
191-
let task_cell = Cell(task);
192-
let mut server_tcp_watcher = server_tcp_watcher;
193-
do server_tcp_watcher.listen |server_stream_watcher, status| {
194-
let maybe_stream = if status.is_none() {
195-
let mut server_stream_watcher = server_stream_watcher;
196-
let mut loop_ = server_stream_watcher.event_loop();
197-
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
198-
let client_tcp_watcher = client_tcp_watcher.as_stream();
199-
// XXX: Need's to be surfaced in interface
200-
server_stream_watcher.accept(client_tcp_watcher);
201-
Ok(~UvTcpStream { watcher: client_tcp_watcher })
202-
} else {
203-
Err(standard_error(OtherIoError))
204-
};
205-
206-
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
200+
let server_tcp_watcher = self.watcher();
201+
let incoming_streams_cell = Cell(self.incoming_streams.clone());
202+
203+
let incoming_streams_cell = Cell(incoming_streams_cell.take());
204+
let mut server_tcp_watcher = server_tcp_watcher;
205+
do server_tcp_watcher.listen |server_stream_watcher, status| {
206+
let maybe_stream = if status.is_none() {
207+
let mut server_stream_watcher = server_stream_watcher;
208+
let mut loop_ = server_stream_watcher.event_loop();
209+
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
210+
let client_tcp_watcher = client_tcp_watcher.as_stream();
211+
// XXX: Need's to be surfaced in interface
212+
server_stream_watcher.accept(client_tcp_watcher);
213+
Ok(~UvTcpStream { watcher: client_tcp_watcher })
214+
} else {
215+
Err(standard_error(OtherIoError))
216+
};
207217

208-
rtdebug!("resuming task from listen");
209-
// Context switch
210-
let scheduler = local_sched::take();
211-
scheduler.resume_task_immediately(task_cell.take());
212-
}
218+
let mut incoming_streams = incoming_streams_cell.take();
219+
incoming_streams.send(maybe_stream);
220+
incoming_streams_cell.put_back(incoming_streams);
213221
}
214222

215-
assert!(!result_cell.is_empty());
216-
return result_cell.take();
223+
return self.incoming_streams.recv();
217224
}
218225
}
219226

0 commit comments

Comments
 (0)