Skip to content

Commit 6990c14

Browse files
committed
Reimplemented throttle to never drop Delay, added boolean flag
1 parent 77a1849 commit 6990c14

File tree

2 files changed

+12
-7
lines changed

2 files changed

+12
-7
lines changed

src/stream/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ extension_trait! {
318318
#[doc = r#"
319319
Limit the amount of items yielded per timeslice in a stream.
320320
321-
This stream does not drop any items, but will only limit the rate at which items pass through.
321+
This stream does not drop any items, but will only limit the rate at which items pass through.
322322
# Examples
323323
```
324324
# fn main() { async_std::task::block_on(async {

src/stream/stream/throttle.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::future::Future;
22
use std::pin::Pin;
3-
use std::time::Duration;
3+
use std::time::{Duration, Instant};
44

55
use futures_timer::Delay;
66
use pin_project_lite::pin_project;
@@ -23,7 +23,9 @@ pin_project! {
2323
stream: S,
2424
duration: Duration,
2525
#[pin]
26-
delay: Option<Delay>,
26+
blocked: bool,
27+
#[pin]
28+
delay: Delay,
2729
}
2830
}
2931

@@ -32,7 +34,8 @@ impl<S: Stream> Throttle<S> {
3234
Throttle {
3335
stream,
3436
duration,
35-
delay: None,
37+
blocked: false,
38+
delay: Delay::new(Duration::default()),
3639
}
3740
}
3841
}
@@ -42,9 +45,10 @@ impl<S: Stream> Stream for Throttle<S> {
4245

4346
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
4447
let mut this = self.project();
45-
if let Some(d) = this.delay.as_mut().as_pin_mut() {
48+
if *this.blocked {
49+
let d = this.delay.as_mut();
4650
if d.poll(cx).is_ready() {
47-
this.delay.set(None);
51+
*this.blocked = false;
4852
} else {
4953
return Poll::Pending;
5054
}
@@ -57,7 +61,8 @@ impl<S: Stream> Stream for Throttle<S> {
5761
}
5862
Poll::Ready(None) => Poll::Ready(None),
5963
Poll::Ready(Some(v)) => {
60-
this.delay.set(Some(Delay::new(*this.duration)));
64+
*this.blocked = true;
65+
this.delay.reset(Instant::now() + *this.duration);
6166
Poll::Ready(Some(v))
6267
}
6368
}

0 commit comments

Comments
 (0)