Skip to content

Add PgLsn::INVALID; Add PgTimestamp for standby_status_update #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions postgres-types/src/pg_lsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ use crate::{FromSql, IsNull, ToSql, Type};
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
pub struct PgLsn(u64);

impl PgLsn {
/// An un-specified or unknown LSN
// The C equivalent is InvalidXLogRecPtr
pub const INVALID: PgLsn = PgLsn(0);
}

/// Error parsing LSN.
#[derive(Debug)]
pub struct ParseLsnError(());
Expand Down
46 changes: 43 additions & 3 deletions tokio-postgres/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,46 @@ use std::task::{Context, Poll};
const STANDBY_STATUS_UPDATE_TAG: u8 = b'r';
const HOT_STANDBY_FEEDBACK_TAG: u8 = b'h';

// FIXME: move this somewhere in the postgres-types crate
// FIXME: this is confusingly similar to postgres_types::Timestamp
use std::convert::{TryFrom, TryInto};
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};

/// A moment in time, relative to the postgres epoch 2000-01-01T00:00:00Z
#[derive(Copy, Clone, Debug)]
pub struct PgTimestamp(pub(crate) i64);

impl TryFrom<SystemTime> for PgTimestamp {
type Error = SystemTimeError;

/// Attempt to create a `PgTimestamp` from the given `SystemTime`.
///
/// This will fail if the input time is earlier than the Postgres
/// epoch (2000-01-01).
///
/// This will panic if the timestamp won't fit in an `i64`.
fn try_from(t: SystemTime) -> Result<Self, Self::Error> {
// Postgres epoch is 2000-01-01T00:00:00Z
// Maybe in the future this can be const?
let pg_epoch = UNIX_EPOCH + Duration::from_secs(946_684_800);
let delta = t.duration_since(pg_epoch)?;
let micros: i64 = delta.as_micros().try_into().expect("timestamp overflow");
Ok(PgTimestamp(micros))
}
}

impl PgTimestamp {
/// The current time, represented as a `PgTimestamp`
///
/// This will fail if the system time is earlier than the Postgres
/// epoch (2000-01-01).
///
/// This will panic if the timestamp won't fit in an `i64`.
pub fn now() -> Result<Self, SystemTimeError> {
Self::try_from(SystemTime::now())
}
}

pin_project! {
/// A type which deserializes the postgres replication protocol. This type can be used with
/// both physical and logical replication to get access to the byte content of each replication
Expand All @@ -38,7 +78,7 @@ impl ReplicationStream {
write_lsn: PgLsn,
flush_lsn: PgLsn,
apply_lsn: PgLsn,
ts: i64,
ts: PgTimestamp,
reply: u8,
) -> Result<(), Error> {
let mut this = self.project();
Expand All @@ -48,7 +88,7 @@ impl ReplicationStream {
buf.put_u64(write_lsn.into());
buf.put_u64(flush_lsn.into());
buf.put_u64(apply_lsn.into());
buf.put_i64(ts);
buf.put_i64(ts.0);
buf.put_u8(reply);

this.stream.send(buf.freeze()).await
Expand Down Expand Up @@ -118,7 +158,7 @@ impl LogicalReplicationStream {
write_lsn: PgLsn,
flush_lsn: PgLsn,
apply_lsn: PgLsn,
ts: i64,
ts: PgTimestamp,
reply: u8,
) -> Result<(), Error> {
let this = self.project();
Expand Down
7 changes: 2 additions & 5 deletions tokio-postgres/tests/test/replication.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use futures::StreamExt;
use std::time::{Duration, UNIX_EPOCH};

use postgres_protocol::message::backend::LogicalReplicationMessage::{Begin, Commit, Insert};
use postgres_protocol::message::backend::ReplicationMessage::*;
use postgres_protocol::message::backend::TupleData;
use postgres_types::PgLsn;
use tokio_postgres::replication::LogicalReplicationStream;
use tokio_postgres::replication::{LogicalReplicationStream, PgTimestamp};
use tokio_postgres::NoTls;
use tokio_postgres::SimpleQueryMessage::Row;

Expand Down Expand Up @@ -130,10 +129,8 @@ async fn test_replication() {

// Send a standby status update and require a keep alive response
let lsn: PgLsn = lsn.parse().unwrap();
let ts = PgTimestamp::now().unwrap();

// Postgres epoch is 2000-01-01T00:00:00Z
let pg_epoch = UNIX_EPOCH + Duration::from_secs(946_684_800);
let ts = pg_epoch.elapsed().unwrap().as_micros() as i64;
stream
.as_mut()
.standby_status_update(lsn, lsn, lsn, ts, 1)
Expand Down