From 9fe5b8c91559258455dc4b895eada139b70fcb2c Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 13 Nov 2024 13:22:50 +0100 Subject: [PATCH 1/3] backend: add interpreted record replication message type This is to be used by the safekeeper -> pageserver protocol. --- postgres-protocol/src/message/backend.rs | 35 ++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/postgres-protocol/src/message/backend.rs b/postgres-protocol/src/message/backend.rs index b6883cc3c..b1069db45 100644 --- a/postgres-protocol/src/message/backend.rs +++ b/postgres-protocol/src/message/backend.rs @@ -39,6 +39,7 @@ pub const READY_FOR_QUERY_TAG: u8 = b'Z'; // replication message tags pub const XLOG_DATA_TAG: u8 = b'w'; pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k'; +pub const INTERPRETED_WAL_RECORD_TAG: u8 = b'0'; // logical replication message tags const BEGIN_TAG: u8 = b'B'; @@ -325,6 +326,7 @@ impl Message { pub enum ReplicationMessage { XLogData(XLogDataBody), PrimaryKeepAlive(PrimaryKeepAliveBody), + RawInterpretedWalRecords(RawInterpretedWalRecordsBody), } impl ReplicationMessage { @@ -370,6 +372,15 @@ impl ReplicationMessage { reply, }) } + INTERPRETED_WAL_RECORD_TAG => { + let streaming_lsn = buf.read_u64::()?; + let wal_end = buf.read_u64::()?; + ReplicationMessage::RawInterpretedWalRecords(RawInterpretedWalRecordsBody { + streaming_lsn, + wal_end, + data: buf.read_all(), + }) + } tag => { return Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -950,6 +961,30 @@ impl XLogDataBody { } } +#[derive(Debug)] +pub struct RawInterpretedWalRecordsBody { + streaming_lsn: u64, + wal_end: u64, + data: D, +} + +impl RawInterpretedWalRecordsBody { + #[inline] + pub fn streaming_lsn(&self) -> u64 { + self.streaming_lsn + } + + #[inline] + pub fn wal_end(&self) -> u64 { + self.wal_end + } + + #[inline] + pub fn data(&self) -> &D { + &self.data + } +} + #[derive(Debug)] pub struct PrimaryKeepAliveBody { wal_end: u64, From e6c92ba268ee7d75ed9ee53c10f9a148747f7e1c Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 15 Nov 2024 12:03:08 +0100 Subject: [PATCH 2/3] backend: add next record lsn to interpreted batch --- postgres-protocol/src/message/backend.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/postgres-protocol/src/message/backend.rs b/postgres-protocol/src/message/backend.rs index b1069db45..ad689f74f 100644 --- a/postgres-protocol/src/message/backend.rs +++ b/postgres-protocol/src/message/backend.rs @@ -375,9 +375,15 @@ impl ReplicationMessage { INTERPRETED_WAL_RECORD_TAG => { let streaming_lsn = buf.read_u64::()?; let wal_end = buf.read_u64::()?; + let next_record_lsn = match buf.read_u64::()? { + 0 => None, + lsn => Some(lsn), + }; + ReplicationMessage::RawInterpretedWalRecords(RawInterpretedWalRecordsBody { streaming_lsn, wal_end, + next_record_lsn, data: buf.read_all(), }) } @@ -965,6 +971,7 @@ impl XLogDataBody { pub struct RawInterpretedWalRecordsBody { streaming_lsn: u64, wal_end: u64, + next_record_lsn: Option, data: D, } @@ -979,6 +986,11 @@ impl RawInterpretedWalRecordsBody { self.wal_end } + #[inline] + pub fn next_record_lsn(&self) -> Option { + self.next_record_lsn + } + #[inline] pub fn data(&self) -> &D { &self.data From e619cf8c2c572e71cbc97f1c7f4cab8219f07d55 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 18 Nov 2024 12:01:54 +0100 Subject: [PATCH 3/3] review: rename wal_end to commit_lsn --- postgres-protocol/src/message/backend.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/postgres-protocol/src/message/backend.rs b/postgres-protocol/src/message/backend.rs index ad689f74f..10dec86ca 100644 --- a/postgres-protocol/src/message/backend.rs +++ b/postgres-protocol/src/message/backend.rs @@ -374,7 +374,7 @@ impl ReplicationMessage { } INTERPRETED_WAL_RECORD_TAG => { let streaming_lsn = buf.read_u64::()?; - let wal_end = buf.read_u64::()?; + let commit_lsn = buf.read_u64::()?; let next_record_lsn = match buf.read_u64::()? { 0 => None, lsn => Some(lsn), @@ -382,7 +382,7 @@ impl ReplicationMessage { ReplicationMessage::RawInterpretedWalRecords(RawInterpretedWalRecordsBody { streaming_lsn, - wal_end, + commit_lsn, next_record_lsn, data: buf.read_all(), }) @@ -970,7 +970,7 @@ impl XLogDataBody { #[derive(Debug)] pub struct RawInterpretedWalRecordsBody { streaming_lsn: u64, - wal_end: u64, + commit_lsn: u64, next_record_lsn: Option, data: D, } @@ -982,8 +982,8 @@ impl RawInterpretedWalRecordsBody { } #[inline] - pub fn wal_end(&self) -> u64 { - self.wal_end + pub fn commit_lsn(&self) -> u64 { + self.commit_lsn } #[inline]