From 2e8826373ea7ec0129dd55f83fb48b3d0c48068b Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Thu, 15 Apr 2021 09:19:56 -0700 Subject: [PATCH 1/2] add a constant to represent the "invalid" LSN value This is the equivalent of InvalidXLogRecPtr in postgres C sources. --- postgres-types/src/pg_lsn.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/postgres-types/src/pg_lsn.rs b/postgres-types/src/pg_lsn.rs index f0bbf4022..ac307582b 100644 --- a/postgres-types/src/pg_lsn.rs +++ b/postgres-types/src/pg_lsn.rs @@ -13,6 +13,12 @@ use crate::{FromSql, IsNull, ToSql, Type}; #[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] pub struct PgLsn(u64); +impl PgLsn { + /// An un-specified or unknown LSN + // The C equivalent is InvalidXLogRecPtr + pub const INVALID: PgLsn = PgLsn(0); +} + /// Error parsing LSN. #[derive(Debug)] pub struct ParseLsnError(()); From a0d067b66447951d1276a53fb09886539c3fa094 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Thu, 15 Apr 2021 10:17:37 -0700 Subject: [PATCH 2/2] add PgTimestamp Create a type to represent a timestamp, stored as an i64 in microseconds relative to the postgres epoch. This needs improvement: - It should move to somewhere in the postgres-type crate, unless this timestamp type is specific to replication. - The name is confusingly similar to postgres_types::Timestamp. --- tokio-postgres/src/replication.rs | 46 ++++++++++++++++++++++-- tokio-postgres/tests/test/replication.rs | 7 ++-- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/tokio-postgres/src/replication.rs b/tokio-postgres/src/replication.rs index f8ddfd546..c6123983b 100644 --- a/tokio-postgres/src/replication.rs +++ b/tokio-postgres/src/replication.rs @@ -14,6 +14,46 @@ use std::task::{Context, Poll}; const STANDBY_STATUS_UPDATE_TAG: u8 = b'r'; const HOT_STANDBY_FEEDBACK_TAG: u8 = b'h'; +// FIXME: move this somewhere in the postgres-types crate +// FIXME: this is confusingly similar to postgres_types::Timestamp +use std::convert::{TryFrom, TryInto}; +use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}; + +/// A moment in time, relative to the postgres epoch 2000-01-01T00:00:00Z +#[derive(Copy, Clone, Debug)] +pub struct PgTimestamp(pub(crate) i64); + +impl TryFrom for PgTimestamp { + type Error = SystemTimeError; + + /// Attempt to create a `PgTimestamp` from the given `SystemTime`. + /// + /// This will fail if the input time is earlier than the Postgres + /// epoch (2000-01-01). + /// + /// This will panic if the timestamp won't fit in an `i64`. + fn try_from(t: SystemTime) -> Result { + // Postgres epoch is 2000-01-01T00:00:00Z + // Maybe in the future this can be const? + let pg_epoch = UNIX_EPOCH + Duration::from_secs(946_684_800); + let delta = t.duration_since(pg_epoch)?; + let micros: i64 = delta.as_micros().try_into().expect("timestamp overflow"); + Ok(PgTimestamp(micros)) + } +} + +impl PgTimestamp { + /// The current time, represented as a `PgTimestamp` + /// + /// This will fail if the system time is earlier than the Postgres + /// epoch (2000-01-01). + /// + /// This will panic if the timestamp won't fit in an `i64`. + pub fn now() -> Result { + Self::try_from(SystemTime::now()) + } +} + pin_project! { /// A type which deserializes the postgres replication protocol. This type can be used with /// both physical and logical replication to get access to the byte content of each replication @@ -38,7 +78,7 @@ impl ReplicationStream { write_lsn: PgLsn, flush_lsn: PgLsn, apply_lsn: PgLsn, - ts: i64, + ts: PgTimestamp, reply: u8, ) -> Result<(), Error> { let mut this = self.project(); @@ -48,7 +88,7 @@ impl ReplicationStream { buf.put_u64(write_lsn.into()); buf.put_u64(flush_lsn.into()); buf.put_u64(apply_lsn.into()); - buf.put_i64(ts); + buf.put_i64(ts.0); buf.put_u8(reply); this.stream.send(buf.freeze()).await @@ -118,7 +158,7 @@ impl LogicalReplicationStream { write_lsn: PgLsn, flush_lsn: PgLsn, apply_lsn: PgLsn, - ts: i64, + ts: PgTimestamp, reply: u8, ) -> Result<(), Error> { let this = self.project(); diff --git a/tokio-postgres/tests/test/replication.rs b/tokio-postgres/tests/test/replication.rs index 22d50e833..42d8b4b33 100644 --- a/tokio-postgres/tests/test/replication.rs +++ b/tokio-postgres/tests/test/replication.rs @@ -1,11 +1,10 @@ use futures::StreamExt; -use std::time::{Duration, UNIX_EPOCH}; use postgres_protocol::message::backend::LogicalReplicationMessage::{Begin, Commit, Insert}; use postgres_protocol::message::backend::ReplicationMessage::*; use postgres_protocol::message::backend::TupleData; use postgres_types::PgLsn; -use tokio_postgres::replication::LogicalReplicationStream; +use tokio_postgres::replication::{LogicalReplicationStream, PgTimestamp}; use tokio_postgres::NoTls; use tokio_postgres::SimpleQueryMessage::Row; @@ -130,10 +129,8 @@ async fn test_replication() { // Send a standby status update and require a keep alive response let lsn: PgLsn = lsn.parse().unwrap(); + let ts = PgTimestamp::now().unwrap(); - // Postgres epoch is 2000-01-01T00:00:00Z - let pg_epoch = UNIX_EPOCH + Duration::from_secs(946_684_800); - let ts = pg_epoch.elapsed().unwrap().as_micros() as i64; stream .as_mut() .standby_status_update(lsn, lsn, lsn, ts, 1)