Skip to content

Commit 82124f0

Browse files
VladLazararpad-m
authored andcommitted
backend: add interpreted record replication message type (#32)
This is to be used by the safekeeper -> pageserver protocol.
1 parent 62f7889 commit 82124f0

File tree

1 file changed

+47
-0
lines changed

1 file changed

+47
-0
lines changed

postgres-protocol/src/message/backend.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub const READY_FOR_QUERY_TAG: u8 = b'Z';
3939
// replication message tags
4040
pub const XLOG_DATA_TAG: u8 = b'w';
4141
pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
42+
pub const INTERPRETED_WAL_RECORD_TAG: u8 = b'0';
4243

4344
// logical replication message tags
4445
const BEGIN_TAG: u8 = b'B';
@@ -325,6 +326,7 @@ impl Message {
325326
pub enum ReplicationMessage<D> {
326327
XLogData(XLogDataBody<D>),
327328
PrimaryKeepAlive(PrimaryKeepAliveBody),
329+
RawInterpretedWalRecords(RawInterpretedWalRecordsBody<D>),
328330
}
329331

330332
impl ReplicationMessage<Bytes> {
@@ -370,6 +372,21 @@ impl ReplicationMessage<Bytes> {
370372
reply,
371373
})
372374
}
375+
INTERPRETED_WAL_RECORD_TAG => {
376+
let streaming_lsn = buf.read_u64::<BigEndian>()?;
377+
let commit_lsn = buf.read_u64::<BigEndian>()?;
378+
let next_record_lsn = match buf.read_u64::<BigEndian>()? {
379+
0 => None,
380+
lsn => Some(lsn),
381+
};
382+
383+
ReplicationMessage::RawInterpretedWalRecords(RawInterpretedWalRecordsBody {
384+
streaming_lsn,
385+
commit_lsn,
386+
next_record_lsn,
387+
data: buf.read_all(),
388+
})
389+
}
373390
tag => {
374391
return Err(io::Error::new(
375392
io::ErrorKind::InvalidInput,
@@ -956,6 +973,36 @@ impl<D> XLogDataBody<D> {
956973
}
957974
}
958975

976+
#[derive(Debug)]
977+
pub struct RawInterpretedWalRecordsBody<D> {
978+
streaming_lsn: u64,
979+
commit_lsn: u64,
980+
next_record_lsn: Option<u64>,
981+
data: D,
982+
}
983+
984+
impl<D> RawInterpretedWalRecordsBody<D> {
985+
#[inline]
986+
pub fn streaming_lsn(&self) -> u64 {
987+
self.streaming_lsn
988+
}
989+
990+
#[inline]
991+
pub fn commit_lsn(&self) -> u64 {
992+
self.commit_lsn
993+
}
994+
995+
#[inline]
996+
pub fn next_record_lsn(&self) -> Option<u64> {
997+
self.next_record_lsn
998+
}
999+
1000+
#[inline]
1001+
pub fn data(&self) -> &D {
1002+
&self.data
1003+
}
1004+
}
1005+
9591006
#[derive(Debug)]
9601007
pub struct PrimaryKeepAliveBody {
9611008
wal_end: u64,

0 commit comments

Comments
 (0)