Skip to content

Commit e27f27c

Browse files
committed
std: Add I/O timeouts to networking objects
These timeouts all follow the same pattern as established by the timeouts on acceptors. There are three methods: set_timeout, set_read_timeout, and set_write_timeout. Each of these sets a point in the future after which operations will time out. Timeouts with cloned objects are a little trickier. Each object is viewed as having its own timeout, unaffected by other objects' timeouts. Additionally, timeouts do not propagate when a stream is cloned or when a cloned stream has its timeouts modified. This commit is just the public interface which will be exposed for timeouts, the implementation will come in later commits.
1 parent e0fcb4e commit e27f27c

File tree

6 files changed

+419
-15
lines changed

6 files changed

+419
-15
lines changed

src/libstd/io/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,17 @@ pub enum IoErrorKind {
434434
InvalidInput,
435435
/// The I/O operation's timeout expired, causing it to be canceled.
436436
TimedOut,
437+
/// This write operation failed to write all of its data.
438+
///
439+
/// Normally the write() method on a Writer guarantees that all of its data
440+
/// has been written, but some operations may be terminated after only
441+
/// partially writing some data. An example of this is a timed out write
442+
/// which successfully wrote a known number of bytes, but bailed out after
443+
/// doing so.
444+
///
445+
/// The payload contained as part of this variant is the number of bytes
446+
/// which are known to have been successfully written.
447+
ShortWrite(uint),
437448
}
438449

439450
/// A trait for objects which are byte-oriented streams. Readers are defined by
@@ -1429,7 +1440,8 @@ pub fn standard_error(kind: IoErrorKind) -> IoError {
14291440
PathDoesntExist => "no such file",
14301441
MismatchedFileTypeForOperation => "mismatched file type",
14311442
ResourceUnavailable => "resource unavailable",
1432-
TimedOut => "operation timed out"
1443+
TimedOut => "operation timed out",
1444+
ShortWrite(..) => "short write",
14331445
};
14341446
IoError {
14351447
kind: kind,

src/libstd/io/net/tcp.rs

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,69 @@ impl TcpStream {
151151
/// Note that this method affects all cloned handles associated with this
152152
/// stream, not just this one handle.
153153
pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
154+
155+
/// Sets a timeout, in milliseconds, for blocking operations on this stream.
156+
///
157+
/// This function will set a timeout for all blocking operations (including
158+
/// reads and writes) on this stream. The timeout specified is a relative
159+
/// time, in milliseconds, into the future after which point operations will
160+
/// time out. This means that the timeout must be reset periodically to keep
161+
/// it from expiring. Specifying a value of `None` will clear the timeout
162+
/// for this stream.
163+
///
164+
/// The timeout on this stream is local to this stream only. Setting a
165+
/// timeout does not affect any other cloned instances of this stream, nor
166+
/// does the timeout propagated to cloned handles of this stream. Setting
167+
/// this timeout will override any specific read or write timeouts
168+
/// previously set for this stream.
169+
///
170+
/// For clarification on the semantics of interrupting a read and a write,
171+
/// take a look at `set_read_timeout` and `set_write_timeout`.
172+
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
173+
self.obj.set_timeout(timeout_ms)
174+
}
175+
176+
/// Sets the timeout for read operations on this stream.
177+
///
178+
/// See documentation in `set_timeout` for the semantics of this read time.
179+
/// This will overwrite any previous read timeout set through either this
180+
/// function or `set_timeout`.
181+
///
182+
/// # Errors
183+
///
184+
/// When this timeout expires, if there is no pending read operation, no
185+
/// action is taken. Otherwise, the read operation will be scheduled to
186+
/// promptly return. If a timeout error is returned, then no data was read
187+
/// during the timeout period.
188+
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
189+
self.obj.set_read_timeout(timeout_ms)
190+
}
191+
192+
/// Sets the timeout for write operations on this stream.
193+
///
194+
/// See documentation in `set_timeout` for the semantics of this write time.
195+
/// This will overwrite any previous write timeout set through either this
196+
/// function or `set_timeout`.
197+
///
198+
/// # Errors
199+
///
200+
/// When this timeout expires, if there is no pending write operation, no
201+
/// action is taken. Otherwise, the pending write operation will be
202+
/// scheduled to promptly return. The actual state of the underlying stream
203+
/// is not specified.
204+
///
205+
/// The write operation may return an error of type `ShortWrite` which
206+
/// indicates that the object is known to have written an exact number of
207+
/// bytes successfully during the timeout period, and the remaining bytes
208+
/// were never written.
209+
///
210+
/// If the write operation returns `TimedOut`, then it the timeout primitive
211+
/// does not know how many bytes were written as part of the timeout
212+
/// operation. It may be the case that bytes continue to be written in an
213+
/// asynchronous fashion after the call to write returns.
214+
pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
215+
self.obj.set_write_timeout(timeout_ms)
216+
}
154217
}
155218

156219
impl Clone for TcpStream {
@@ -892,6 +955,7 @@ mod test {
892955
Err(ref e) if e.kind == TimedOut => {}
893956
Err(e) => fail!("error: {}", e),
894957
}
958+
::task::deschedule();
895959
if i == 1000 { fail!("should have a pending connection") }
896960
}
897961
drop(l);
@@ -964,4 +1028,118 @@ mod test {
9641028
// this test will never finish if the child doesn't wake up
9651029
rx.recv();
9661030
})
1031+
1032+
iotest!(fn readwrite_timeouts() {
1033+
let addr = next_test_ip6();
1034+
let mut a = TcpListener::bind(addr).listen().unwrap();
1035+
let (tx, rx) = channel::<()>();
1036+
spawn(proc() {
1037+
let mut s = TcpStream::connect(addr).unwrap();
1038+
rx.recv();
1039+
assert!(s.write([0]).is_ok());
1040+
let _ = rx.recv_opt();
1041+
});
1042+
1043+
let mut s = a.accept().unwrap();
1044+
s.set_timeout(Some(20));
1045+
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1046+
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1047+
1048+
s.set_timeout(Some(20));
1049+
for i in range(0, 1001) {
1050+
match s.write([0, .. 128 * 1024]) {
1051+
Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1052+
Err(IoError { kind: TimedOut, .. }) => break,
1053+
Err(e) => fail!("{}", e),
1054+
}
1055+
if i == 1000 { fail!("should have filled up?!"); }
1056+
}
1057+
assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1058+
1059+
tx.send(());
1060+
s.set_timeout(None);
1061+
assert_eq!(s.read([0, 0]), Ok(1));
1062+
})
1063+
1064+
iotest!(fn read_timeouts() {
1065+
let addr = next_test_ip6();
1066+
let mut a = TcpListener::bind(addr).listen().unwrap();
1067+
let (tx, rx) = channel::<()>();
1068+
spawn(proc() {
1069+
let mut s = TcpStream::connect(addr).unwrap();
1070+
rx.recv();
1071+
let mut amt = 0;
1072+
while amt < 100 * 128 * 1024 {
1073+
match s.read([0, ..128 * 1024]) {
1074+
Ok(n) => { amt += n; }
1075+
Err(e) => fail!("{}", e),
1076+
}
1077+
}
1078+
let _ = rx.recv_opt();
1079+
});
1080+
1081+
let mut s = a.accept().unwrap();
1082+
s.set_read_timeout(Some(20));
1083+
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1084+
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1085+
1086+
tx.send(());
1087+
for _ in range(0, 100) {
1088+
assert!(s.write([0, ..128 * 1024]).is_ok());
1089+
}
1090+
})
1091+
1092+
iotest!(fn write_timeouts() {
1093+
let addr = next_test_ip6();
1094+
let mut a = TcpListener::bind(addr).listen().unwrap();
1095+
let (tx, rx) = channel::<()>();
1096+
spawn(proc() {
1097+
let mut s = TcpStream::connect(addr).unwrap();
1098+
rx.recv();
1099+
assert!(s.write([0]).is_ok());
1100+
let _ = rx.recv_opt();
1101+
});
1102+
1103+
let mut s = a.accept().unwrap();
1104+
s.set_write_timeout(Some(20));
1105+
for i in range(0, 1001) {
1106+
match s.write([0, .. 128 * 1024]) {
1107+
Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1108+
Err(IoError { kind: TimedOut, .. }) => break,
1109+
Err(e) => fail!("{}", e),
1110+
}
1111+
if i == 1000 { fail!("should have filled up?!"); }
1112+
}
1113+
assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1114+
1115+
tx.send(());
1116+
assert!(s.read([0]).is_ok());
1117+
})
1118+
1119+
iotest!(fn timeout_concurrent_read() {
1120+
let addr = next_test_ip6();
1121+
let mut a = TcpListener::bind(addr).listen().unwrap();
1122+
let (tx, rx) = channel::<()>();
1123+
spawn(proc() {
1124+
let mut s = TcpStream::connect(addr).unwrap();
1125+
rx.recv();
1126+
assert_eq!(s.write([0]), Ok(()));
1127+
let _ = rx.recv_opt();
1128+
});
1129+
1130+
let mut s = a.accept().unwrap();
1131+
let s2 = s.clone();
1132+
let (tx2, rx2) = channel();
1133+
spawn(proc() {
1134+
let mut s2 = s2;
1135+
assert_eq!(s2.read([0]), Ok(1));
1136+
tx2.send(());
1137+
});
1138+
1139+
s.set_read_timeout(Some(20));
1140+
assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1141+
tx.send(());
1142+
1143+
rx2.recv();
1144+
})
9671145
}

src/libstd/io/net/udp.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use io::net::ip::{SocketAddr, IpAddr};
2020
use io::{Reader, Writer, IoResult};
2121
use kinds::Send;
2222
use owned::Box;
23+
use option::Option;
2324
use result::{Ok, Err};
2425
use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo};
2526

@@ -142,6 +143,27 @@ impl UdpSocket {
142143
self.obj.ignore_broadcasts()
143144
}
144145
}
146+
147+
/// Sets the read/write timeout for this socket.
148+
///
149+
/// For more information, see `TcpStream::set_timeout`
150+
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
151+
self.obj.set_timeout(timeout_ms)
152+
}
153+
154+
/// Sets the read timeout for this socket.
155+
///
156+
/// For more information, see `TcpStream::set_timeout`
157+
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
158+
self.obj.set_read_timeout(timeout_ms)
159+
}
160+
161+
/// Sets the write timeout for this socket.
162+
///
163+
/// For more information, see `TcpStream::set_timeout`
164+
pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
165+
self.obj.set_write_timeout(timeout_ms)
166+
}
145167
}
146168

147169
impl Clone for UdpSocket {
@@ -485,4 +507,56 @@ mod test {
485507
rx.recv();
486508
serv_rx.recv();
487509
})
510+
511+
iotest!(fn recvfrom_timeout() {
512+
let addr1 = next_test_ip4();
513+
let addr2 = next_test_ip4();
514+
let mut a = UdpSocket::bind(addr1).unwrap();
515+
516+
let (tx, rx) = channel();
517+
let (tx2, rx2) = channel();
518+
spawn(proc() {
519+
let mut a = UdpSocket::bind(addr2).unwrap();
520+
assert_eq!(a.recvfrom([0]), Ok((1, addr1)));
521+
assert_eq!(a.sendto([0], addr1), Ok(()));
522+
rx.recv();
523+
assert_eq!(a.sendto([0], addr1), Ok(()));
524+
525+
tx2.send(());
526+
});
527+
528+
// Make sure that reads time out, but writes can continue
529+
a.set_read_timeout(Some(20));
530+
assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
531+
assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
532+
assert_eq!(a.sendto([0], addr2), Ok(()));
533+
534+
// Cloned handles should be able to block
535+
let mut a2 = a.clone();
536+
assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
537+
538+
// Clearing the timeout should allow for receiving
539+
a.set_timeout(None);
540+
tx.send(());
541+
assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
542+
543+
// Make sure the child didn't die
544+
rx2.recv();
545+
})
546+
547+
iotest!(fn sendto_timeout() {
548+
let addr1 = next_test_ip4();
549+
let addr2 = next_test_ip4();
550+
let mut a = UdpSocket::bind(addr1).unwrap();
551+
let _b = UdpSocket::bind(addr2).unwrap();
552+
553+
a.set_write_timeout(Some(1000));
554+
for _ in range(0, 100) {
555+
match a.sendto([0, ..4*1024], addr2) {
556+
Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
557+
Err(IoError { kind: TimedOut, .. }) => break,
558+
Err(e) => fail!("other error: {}", e),
559+
}
560+
}
561+
})
488562
}

0 commit comments

Comments
 (0)