Skip to content

Commit 67f2d66

Browse files
committed
helper ReplicationStream type for replication protocol
This can be optionally used with a CopyBoth stream to decode the replication protocol Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent 85f0d50 commit 67f2d66

File tree

2 files changed

+94
-0
lines changed

2 files changed

+94
-0
lines changed

tokio-postgres/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ mod maybe_tls_stream;
164164
mod portal;
165165
mod prepare;
166166
mod query;
167+
pub mod replication;
167168
pub mod row;
168169
mod simple_query;
169170
#[cfg(feature = "runtime")]

tokio-postgres/src/replication.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
//! Utilities for working with the PostgreSQL replication copy both format.
2+
3+
use crate::copy_both::CopyBothDuplex;
4+
use crate::Error;
5+
use bytes::{BufMut, Bytes, BytesMut};
6+
use futures::{ready, SinkExt, Stream};
7+
use pin_project_lite::pin_project;
8+
use postgres_protocol::message::backend::ReplicationMessage;
9+
use postgres_types::PgLsn;
10+
use std::pin::Pin;
11+
use std::task::{Context, Poll};
12+
13+
const STANDBY_STATUS_UPDATE_TAG: u8 = b'r';
14+
const HOT_STANDBY_FEEDBACK_TAG: u8 = b'h';
15+
16+
pin_project! {
17+
/// A type which deserializes the postgres replication protocol. This type can be used with
18+
/// both physical and logical replication to get access to the byte content of each replication
19+
/// message.
20+
///
21+
/// The replication *must* be explicitly completed via the `finish` method.
22+
pub struct ReplicationStream {
23+
#[pin]
24+
stream: CopyBothDuplex<Bytes>,
25+
}
26+
}
27+
28+
impl ReplicationStream {
29+
/// Creates a new ReplicationStream that will wrap the underlying CopyBoth stream
30+
pub fn new(stream: CopyBothDuplex<Bytes>) -> Self {
31+
Self { stream }
32+
}
33+
34+
/// Send standby update to server.
35+
pub async fn standby_status_update(
36+
self: Pin<&mut Self>,
37+
write_lsn: PgLsn,
38+
flush_lsn: PgLsn,
39+
apply_lsn: PgLsn,
40+
ts: i64,
41+
reply: u8,
42+
) -> Result<(), Error> {
43+
let mut this = self.project();
44+
45+
let mut buf = BytesMut::new();
46+
buf.put_u8(STANDBY_STATUS_UPDATE_TAG);
47+
buf.put_u64(write_lsn.into());
48+
buf.put_u64(flush_lsn.into());
49+
buf.put_u64(apply_lsn.into());
50+
buf.put_i64(ts);
51+
buf.put_u8(reply);
52+
53+
this.stream.send(buf.freeze()).await
54+
}
55+
56+
/// Send hot standby feedback message to server.
57+
pub async fn hot_standby_feedback(
58+
self: Pin<&mut Self>,
59+
timestamp: i64,
60+
global_xmin: u32,
61+
global_xmin_epoch: u32,
62+
catalog_xmin: u32,
63+
catalog_xmin_epoch: u32,
64+
) -> Result<(), Error> {
65+
let mut this = self.project();
66+
67+
let mut buf = BytesMut::new();
68+
buf.put_u8(HOT_STANDBY_FEEDBACK_TAG);
69+
buf.put_i64(timestamp);
70+
buf.put_u32(global_xmin);
71+
buf.put_u32(global_xmin_epoch);
72+
buf.put_u32(catalog_xmin);
73+
buf.put_u32(catalog_xmin_epoch);
74+
75+
this.stream.send(buf.freeze()).await
76+
}
77+
}
78+
79+
impl Stream for ReplicationStream {
80+
type Item = Result<ReplicationMessage<Bytes>, Error>;
81+
82+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83+
let this = self.project();
84+
85+
match ready!(this.stream.poll_next(cx)) {
86+
Some(Ok(buf)) => {
87+
Poll::Ready(Some(ReplicationMessage::parse(&buf).map_err(Error::parse)))
88+
}
89+
Some(Err(err)) => Poll::Ready(Some(Err(err))),
90+
None => Poll::Ready(None),
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)