Skip to content

Commit e48f73e

Browse files
committed
helper ReplicationStream type for replication protocol
This can be optionally used with a CopyBoth stream to decode the replication protocol Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent 3c64e0a commit e48f73e

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

tokio-postgres/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ mod maybe_tls_stream;
164164
mod portal;
165165
mod prepare;
166166
mod query;
167+
pub mod replication;
167168
pub mod row;
168169
mod simple_query;
169170
#[cfg(feature = "runtime")]

tokio-postgres/src/replication.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
//! Utilities for working with the PostgreSQL replication copy both format.
2+
3+
use crate::copy_both::CopyBothDuplex;
4+
use crate::Error;
5+
use bytes::{Bytes, BytesMut};
6+
use futures::{ready, SinkExt, Stream};
7+
use pin_project_lite::pin_project;
8+
use postgres_protocol::message::backend::ReplicationMessage;
9+
use postgres_protocol::message::frontend;
10+
use postgres_types::PgLsn;
11+
use std::pin::Pin;
12+
use std::task::{Context, Poll};
13+
14+
pin_project! {
15+
/// A type which deserializes the postgres replication protocol. This type can be used with
16+
/// both physical and logical replication to get access to the byte content of each replication
17+
/// message.
18+
///
19+
/// The replication *must* be explicitly completed via the `finish` method.
20+
pub struct ReplicationStream {
21+
#[pin]
22+
stream: CopyBothDuplex<Bytes>,
23+
}
24+
}
25+
26+
impl ReplicationStream {
27+
/// Creates a new ReplicationStream that will wrap the underlying CopyBoth stream
28+
pub fn new(stream: CopyBothDuplex<Bytes>) -> Self {
29+
Self { stream }
30+
}
31+
32+
/// Send standby update to server.
33+
pub async fn standby_status_update(
34+
self: Pin<&mut Self>,
35+
write_lsn: PgLsn,
36+
flush_lsn: PgLsn,
37+
apply_lsn: PgLsn,
38+
ts: i64,
39+
reply: u8,
40+
) -> Result<(), Error> {
41+
let mut this = self.project();
42+
43+
let mut buf = BytesMut::new();
44+
frontend::standby_status_update(
45+
write_lsn.into(),
46+
flush_lsn.into(),
47+
apply_lsn.into(),
48+
ts as i64,
49+
reply,
50+
&mut buf,
51+
)
52+
.expect("writes to memory never fail");
53+
this.stream.send(buf.freeze()).await
54+
}
55+
56+
/// Send hot standby feedback message to server.
57+
pub async fn hot_standby_feedback(
58+
self: Pin<&mut Self>,
59+
timestamp: i64,
60+
global_xmin: u32,
61+
global_xmin_epoch: u32,
62+
catalog_xmin: u32,
63+
catalog_xmin_epoch: u32,
64+
) -> Result<(), Error> {
65+
let mut this = self.project();
66+
67+
let mut buf = BytesMut::new();
68+
frontend::hot_standby_feedback(
69+
timestamp,
70+
global_xmin,
71+
global_xmin_epoch,
72+
catalog_xmin,
73+
catalog_xmin_epoch,
74+
&mut buf,
75+
)
76+
.expect("writes to memory never fail");
77+
this.stream.send(buf.freeze()).await
78+
}
79+
}
80+
81+
impl Stream for ReplicationStream {
82+
type Item = Result<ReplicationMessage<Bytes>, Error>;
83+
84+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85+
let this = self.project();
86+
87+
match ready!(this.stream.poll_next(cx)) {
88+
Some(Ok(buf)) => {
89+
Poll::Ready(Some(ReplicationMessage::parse(&buf).map_err(Error::parse)))
90+
}
91+
Some(Err(err)) => Poll::Ready(Some(Err(err))),
92+
None => Poll::Ready(None),
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)