diff --git a/Cargo.toml b/Cargo.toml index 37374f7b6..d4a5ad70c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ crossbeam-deque = { version = "0.7.3", optional = true } crossbeam-utils = { version = "0.7.2", optional = true } futures-core = { version = "0.3.4", optional = true, default-features = false } futures-io = { version = "0.3.4", optional = true } -futures-timer = { version = "2.0.2", optional = true } +futures-timer = { version = "3.0.2", optional = true } kv-log-macro = { version = "1.0.4", optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true } memchr = { version = "2.3.3", optional = true } diff --git a/src/stream/interval.rs b/src/stream/interval.rs index 7a0c1740b..9c6bcd6f7 100644 --- a/src/stream/interval.rs +++ b/src/stream/interval.rs @@ -71,9 +71,8 @@ impl Stream for Interval { if Pin::new(&mut self.delay).poll(cx).is_pending() { return Poll::Pending; } - let when = Instant::now(); - let next = next_interval(when, Instant::now(), self.interval); - self.delay.reset(next); + let dur = self.interval; + self.delay.reset(dur); Poll::Ready(Some(())) } } diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index ce8c13b37..554ca306e 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -1,6 +1,6 @@ use std::future::Future; use std::pin::Pin; -use std::time::{Duration, Instant}; +use std::time::Duration; use futures_timer::Delay; use pin_project_lite::pin_project; @@ -59,7 +59,7 @@ impl Stream for Throttle { Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { *this.blocked = true; - this.delay.reset(Instant::now() + *this.duration); + this.delay.reset(*this.duration); Poll::Ready(Some(v)) } }