Skip to content

Commit e66235a

Browse files
VladLazararpad-m
authored andcommitted
backend: add interpreted record replication message type (#32)
This is to be used by the safekeeper -> pageserver protocol.
1 parent 6ddf673 commit e66235a

File tree

1 file changed

+47
-0
lines changed

1 file changed

+47
-0
lines changed

postgres-protocol/src/message/backend.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub const READY_FOR_QUERY_TAG: u8 = b'Z';
3939
// replication message tags
4040
pub const XLOG_DATA_TAG: u8 = b'w';
4141
pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
42+
pub const INTERPRETED_WAL_RECORD_TAG: u8 = b'0';
4243

4344
// logical replication message tags
4445
const BEGIN_TAG: u8 = b'B';
@@ -325,6 +326,7 @@ impl Message {
325326
pub enum ReplicationMessage<D> {
326327
XLogData(XLogDataBody<D>),
327328
PrimaryKeepAlive(PrimaryKeepAliveBody),
329+
RawInterpretedWalRecords(RawInterpretedWalRecordsBody<D>),
328330
}
329331

330332
impl ReplicationMessage<Bytes> {
@@ -370,6 +372,21 @@ impl ReplicationMessage<Bytes> {
370372
reply,
371373
})
372374
}
375+
INTERPRETED_WAL_RECORD_TAG => {
376+
let streaming_lsn = buf.read_u64::<BigEndian>()?;
377+
let commit_lsn = buf.read_u64::<BigEndian>()?;
378+
let next_record_lsn = match buf.read_u64::<BigEndian>()? {
379+
0 => None,
380+
lsn => Some(lsn),
381+
};
382+
383+
ReplicationMessage::RawInterpretedWalRecords(RawInterpretedWalRecordsBody {
384+
streaming_lsn,
385+
commit_lsn,
386+
next_record_lsn,
387+
data: buf.read_all(),
388+
})
389+
}
373390
tag => {
374391
return Err(io::Error::new(
375392
io::ErrorKind::InvalidInput,
@@ -962,6 +979,36 @@ impl<D> XLogDataBody<D> {
962979
}
963980
}
964981

982+
#[derive(Debug)]
983+
pub struct RawInterpretedWalRecordsBody<D> {
984+
streaming_lsn: u64,
985+
commit_lsn: u64,
986+
next_record_lsn: Option<u64>,
987+
data: D,
988+
}
989+
990+
impl<D> RawInterpretedWalRecordsBody<D> {
991+
#[inline]
992+
pub fn streaming_lsn(&self) -> u64 {
993+
self.streaming_lsn
994+
}
995+
996+
#[inline]
997+
pub fn commit_lsn(&self) -> u64 {
998+
self.commit_lsn
999+
}
1000+
1001+
#[inline]
1002+
pub fn next_record_lsn(&self) -> Option<u64> {
1003+
self.next_record_lsn
1004+
}
1005+
1006+
#[inline]
1007+
pub fn data(&self) -> &D {
1008+
&self.data
1009+
}
1010+
}
1011+
9651012
#[derive(Debug)]
9661013
pub struct PrimaryKeepAliveBody {
9671014
wal_end: u64,

0 commit comments

Comments
 (0)