Skip to content

Commit a38781a

Browse files
committed
helper LogicalReplicationStream type to decode logical replication
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent ba392a7 commit a38781a

File tree

1 file changed

+80
-0
lines changed

1 file changed

+80
-0
lines changed

tokio-postgres/src/replication.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::Error;
55
use bytes::{Bytes, BytesMut};
66
use futures::{ready, SinkExt, Stream};
77
use pin_project_lite::pin_project;
8+
use postgres_protocol::message::backend::LogicalReplicationMessage;
89
use postgres_protocol::message::backend::ReplicationMessage;
910
use postgres_protocol::message::frontend;
1011
use postgres_types::PgLsn;
@@ -93,3 +94,82 @@ impl Stream for ReplicationStream {
9394
}
9495
}
9596
}
97+
98+
pin_project! {
99+
/// A type which deserializes the postgres logical replication protocol. This type gives access
100+
/// to a high level representation of the changes in transaction commit order.
101+
///
102+
/// The replication *must* be explicitly completed via the `finish` method.
103+
pub struct LogicalReplicationStream {
104+
#[pin]
105+
stream: ReplicationStream,
106+
}
107+
}
108+
109+
impl LogicalReplicationStream {
110+
/// Creates a new LogicalReplicationStream that will wrap the underlying CopyBoth stream
111+
pub fn new(stream: CopyBothDuplex<Bytes>) -> Self {
112+
Self {
113+
stream: ReplicationStream::new(stream),
114+
}
115+
}
116+
117+
/// Send standby update to server.
118+
pub async fn standby_status_update(
119+
self: Pin<&mut Self>,
120+
write_lsn: PgLsn,
121+
flush_lsn: PgLsn,
122+
apply_lsn: PgLsn,
123+
ts: i64,
124+
reply: u8,
125+
) -> Result<(), Error> {
126+
let this = self.project();
127+
this.stream
128+
.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, reply)
129+
.await
130+
}
131+
132+
/// Send hot standby feedback message to server.
133+
pub async fn hot_standby_feedback(
134+
self: Pin<&mut Self>,
135+
timestamp: i64,
136+
global_xmin: u32,
137+
global_xmin_epoch: u32,
138+
catalog_xmin: u32,
139+
catalog_xmin_epoch: u32,
140+
) -> Result<(), Error> {
141+
let this = self.project();
142+
this.stream
143+
.hot_standby_feedback(
144+
timestamp,
145+
global_xmin,
146+
global_xmin_epoch,
147+
catalog_xmin,
148+
catalog_xmin_epoch,
149+
)
150+
.await
151+
}
152+
}
153+
154+
impl Stream for LogicalReplicationStream {
155+
type Item = Result<ReplicationMessage<LogicalReplicationMessage>, Error>;
156+
157+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
158+
let this = self.project();
159+
160+
match ready!(this.stream.poll_next(cx)) {
161+
Some(Ok(ReplicationMessage::XLogData(body))) => {
162+
let body = body
163+
.map_data(|buf| LogicalReplicationMessage::parse(&buf))
164+
.map_err(Error::parse)?;
165+
Poll::Ready(Some(Ok(ReplicationMessage::XLogData(body))))
166+
}
167+
Some(Ok(ReplicationMessage::PrimaryKeepAlive(body))) => {
168+
Poll::Ready(Some(Ok(ReplicationMessage::PrimaryKeepAlive(body))))
169+
}
170+
Some(Ok(_)) => Poll::Ready(Some(Err(Error::unexpected_message()))),
171+
Some(Err(err)) => Poll::Ready(Some(Err(err))),
172+
None => Poll::Ready(None),
173+
}
174+
}
175+
}

0 commit comments

Comments
 (0)