Skip to content

Commit e5a174a

Browse files
author
Stjepan Glavina
authored
Merge pull request async-rs#5 from Keruspe/optimistic-rw
skip optimistic read/write when there already are wakers registered
2 parents d3b5cd4 + b07a426 commit e5a174a

File tree

2 files changed

+56
-13
lines changed

2 files changed

+56
-13
lines changed

src/lib.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -518,10 +518,14 @@ impl<T> Async<T> {
518518
pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
519519
let mut op = op;
520520
loop {
521-
match op(self.get_ref()) {
522-
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
523-
res => return res,
521+
// If there are no blocked readers, attempt the read operation.
522+
if !self.source.readers_registered() {
523+
match op(self.get_ref()) {
524+
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
525+
res => return res,
526+
}
524527
}
528+
// Wait until the I/O handle becomes readable.
525529
optimistic(self.readable()).await?;
526530
}
527531
}
@@ -554,10 +558,14 @@ impl<T> Async<T> {
554558
) -> io::Result<R> {
555559
let mut op = op;
556560
loop {
557-
match op(self.get_mut()) {
558-
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
559-
res => return res,
561+
// If there are no blocked readers, attempt the read operation.
562+
if !self.source.readers_registered() {
563+
match op(self.get_mut()) {
564+
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
565+
res => return res,
566+
}
560567
}
568+
// Wait until the I/O handle becomes readable.
561569
optimistic(self.readable()).await?;
562570
}
563571
}
@@ -588,10 +596,14 @@ impl<T> Async<T> {
588596
pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
589597
let mut op = op;
590598
loop {
591-
match op(self.get_ref()) {
592-
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
593-
res => return res,
599+
// If there are no blocked readers, attempt the write operation.
600+
if !self.source.writers_registered() {
601+
match op(self.get_ref()) {
602+
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
603+
res => return res,
604+
}
594605
}
606+
// Wait until the I/O handle becomes writable.
595607
optimistic(self.writable()).await?;
596608
}
597609
}
@@ -625,10 +637,14 @@ impl<T> Async<T> {
625637
) -> io::Result<R> {
626638
let mut op = op;
627639
loop {
628-
match op(self.get_mut()) {
629-
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
630-
res => return res,
640+
// If there are no blocked readers, attempt the write operation.
641+
if !self.source.writers_registered() {
642+
match op(self.get_mut()) {
643+
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
644+
res => return res,
645+
}
631646
}
647+
// Wait until the I/O handle becomes writable.
632648
optimistic(self.writable()).await?;
633649
}
634650
}

src/reactor.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::os::unix::io::RawFd;
66
#[cfg(windows)]
77
use std::os::windows::io::RawSocket;
88
use std::panic;
9-
use std::sync::atomic::{AtomicUsize, Ordering};
9+
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
1010
use std::sync::{Arc, Mutex, MutexGuard};
1111
use std::task::{Poll, Waker};
1212
use std::thread;
@@ -166,6 +166,7 @@ impl Reactor {
166166
readers: Vec::new(),
167167
writers: Vec::new(),
168168
}),
169+
wakers_registered: AtomicU8::new(0),
169170
});
170171
sources.insert(source.clone());
171172

@@ -338,12 +339,18 @@ impl ReactorLock<'_> {
338339
if ev.readable {
339340
w.tick_readable = tick;
340341
wakers.append(&mut w.readers);
342+
source
343+
.wakers_registered
344+
.fetch_and(!Source::READERS_REGISTERED, Ordering::SeqCst);
341345
}
342346

343347
// Wake writers if a writability event was emitted.
344348
if ev.writable {
345349
w.tick_writable = tick;
346350
wakers.append(&mut w.writers);
351+
source
352+
.wakers_registered
353+
.fetch_and(!Source::WRITERS_REGISTERED, Ordering::SeqCst);
347354
}
348355

349356
// Re-register if there are still writers or
@@ -408,6 +415,9 @@ pub(crate) struct Source {
408415

409416
/// Tasks interested in events on this source.
410417
wakers: Mutex<Wakers>,
418+
419+
/// Whether there are wakers interrested in events on this source.
420+
wakers_registered: AtomicU8,
411421
}
412422

413423
/// Tasks interested in events on a source.
@@ -427,6 +437,9 @@ struct Wakers {
427437
}
428438

429439
impl Source {
440+
const READERS_REGISTERED: u8 = 1 << 0;
441+
const WRITERS_REGISTERED: u8 = 1 << 1;
442+
430443
/// Waits until the I/O source is readable.
431444
pub(crate) async fn readable(&self) -> io::Result<()> {
432445
let mut ticks = None;
@@ -453,6 +466,8 @@ impl Source {
453466
writable: !w.writers.is_empty(),
454467
},
455468
)?;
469+
self.wakers_registered
470+
.fetch_or(Self::READERS_REGISTERED, Ordering::SeqCst);
456471
}
457472

458473
// Register the current task's waker if not present already.
@@ -473,6 +488,11 @@ impl Source {
473488
.await
474489
}
475490

491+
pub(crate) fn readers_registered(&self) -> bool {
492+
self.wakers_registered.load(Ordering::SeqCst) & Self::READERS_REGISTERED
493+
== Self::READERS_REGISTERED
494+
}
495+
476496
/// Waits until the I/O source is writable.
477497
pub(crate) async fn writable(&self) -> io::Result<()> {
478498
let mut ticks = None;
@@ -499,6 +519,8 @@ impl Source {
499519
writable: true,
500520
},
501521
)?;
522+
self.wakers_registered
523+
.fetch_or(Self::WRITERS_REGISTERED, Ordering::SeqCst);
502524
}
503525

504526
// Register the current task's waker if not present already.
@@ -518,4 +540,9 @@ impl Source {
518540
})
519541
.await
520542
}
543+
544+
pub(crate) fn writers_registered(&self) -> bool {
545+
self.wakers_registered.load(Ordering::SeqCst) & Self::WRITERS_REGISTERED
546+
== Self::WRITERS_REGISTERED
547+
}
521548
}

0 commit comments

Comments
 (0)