Skip to content

Commit 593f0c9

Browse files
VladLazarconradludgate
authored andcommitted
backend: add interpreted record replication message type (#32)
This is to be used by the safekeeper -> pageserver protocol.
1 parent 691aebb commit 593f0c9

File tree

1 file changed

+47
-0
lines changed

1 file changed

+47
-0
lines changed

postgres-protocol/src/message/backend.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub const READY_FOR_QUERY_TAG: u8 = b'Z';
3939
// replication message tags
4040
pub const XLOG_DATA_TAG: u8 = b'w';
4141
pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
42+
pub const INTERPRETED_WAL_RECORD_TAG: u8 = b'0';
4243

4344
// logical replication message tags
4445
const BEGIN_TAG: u8 = b'B';
@@ -325,6 +326,7 @@ impl Message {
325326
pub enum ReplicationMessage<D> {
326327
XLogData(XLogDataBody<D>),
327328
PrimaryKeepAlive(PrimaryKeepAliveBody),
329+
RawInterpretedWalRecords(RawInterpretedWalRecordsBody<D>),
328330
}
329331

330332
impl ReplicationMessage<Bytes> {
@@ -370,6 +372,21 @@ impl ReplicationMessage<Bytes> {
370372
reply,
371373
})
372374
}
375+
INTERPRETED_WAL_RECORD_TAG => {
376+
let streaming_lsn = buf.read_u64::<BigEndian>()?;
377+
let commit_lsn = buf.read_u64::<BigEndian>()?;
378+
let next_record_lsn = match buf.read_u64::<BigEndian>()? {
379+
0 => None,
380+
lsn => Some(lsn),
381+
};
382+
383+
ReplicationMessage::RawInterpretedWalRecords(RawInterpretedWalRecordsBody {
384+
streaming_lsn,
385+
commit_lsn,
386+
next_record_lsn,
387+
data: buf.read_all(),
388+
})
389+
}
373390
tag => {
374391
return Err(io::Error::new(
375392
io::ErrorKind::InvalidInput,
@@ -961,6 +978,36 @@ impl<D> XLogDataBody<D> {
961978
}
962979
}
963980

981+
#[derive(Debug)]
982+
pub struct RawInterpretedWalRecordsBody<D> {
983+
streaming_lsn: u64,
984+
commit_lsn: u64,
985+
next_record_lsn: Option<u64>,
986+
data: D,
987+
}
988+
989+
impl<D> RawInterpretedWalRecordsBody<D> {
990+
#[inline]
991+
pub fn streaming_lsn(&self) -> u64 {
992+
self.streaming_lsn
993+
}
994+
995+
#[inline]
996+
pub fn commit_lsn(&self) -> u64 {
997+
self.commit_lsn
998+
}
999+
1000+
#[inline]
1001+
pub fn next_record_lsn(&self) -> Option<u64> {
1002+
self.next_record_lsn
1003+
}
1004+
1005+
#[inline]
1006+
pub fn data(&self) -> &D {
1007+
&self.data
1008+
}
1009+
}
1010+
9641011
#[derive(Debug)]
9651012
pub struct PrimaryKeepAliveBody {
9661013
wal_end: u64,

0 commit comments

Comments
 (0)