@@ -5,6 +5,7 @@ use crate::Error;
5
5
use bytes:: { BufMut , Bytes , BytesMut } ;
6
6
use futures:: { ready, SinkExt , Stream } ;
7
7
use pin_project_lite:: pin_project;
8
+ use postgres_protocol:: message:: backend:: LogicalReplicationMessage ;
8
9
use postgres_protocol:: message:: backend:: ReplicationMessage ;
9
10
use postgres_types:: PgLsn ;
10
11
use std:: pin:: Pin ;
@@ -91,3 +92,82 @@ impl Stream for ReplicationStream {
91
92
}
92
93
}
93
94
}
95
+
96
+ pin_project ! {
97
+ /// A type which deserializes the postgres logical replication protocol. This type gives access
98
+ /// to a high level representation of the changes in transaction commit order.
99
+ ///
100
+ /// The replication *must* be explicitly completed via the `finish` method.
101
+ pub struct LogicalReplicationStream {
102
+ #[ pin]
103
+ stream: ReplicationStream ,
104
+ }
105
+ }
106
+
107
+ impl LogicalReplicationStream {
108
+ /// Creates a new LogicalReplicationStream that will wrap the underlying CopyBoth stream
109
+ pub fn new ( stream : CopyBothDuplex < Bytes > ) -> Self {
110
+ Self {
111
+ stream : ReplicationStream :: new ( stream) ,
112
+ }
113
+ }
114
+
115
+ /// Send standby update to server.
116
+ pub async fn standby_status_update (
117
+ self : Pin < & mut Self > ,
118
+ write_lsn : PgLsn ,
119
+ flush_lsn : PgLsn ,
120
+ apply_lsn : PgLsn ,
121
+ ts : i64 ,
122
+ reply : u8 ,
123
+ ) -> Result < ( ) , Error > {
124
+ let this = self . project ( ) ;
125
+ this. stream
126
+ . standby_status_update ( write_lsn, flush_lsn, apply_lsn, ts, reply)
127
+ . await
128
+ }
129
+
130
+ /// Send hot standby feedback message to server.
131
+ pub async fn hot_standby_feedback (
132
+ self : Pin < & mut Self > ,
133
+ timestamp : i64 ,
134
+ global_xmin : u32 ,
135
+ global_xmin_epoch : u32 ,
136
+ catalog_xmin : u32 ,
137
+ catalog_xmin_epoch : u32 ,
138
+ ) -> Result < ( ) , Error > {
139
+ let this = self . project ( ) ;
140
+ this. stream
141
+ . hot_standby_feedback (
142
+ timestamp,
143
+ global_xmin,
144
+ global_xmin_epoch,
145
+ catalog_xmin,
146
+ catalog_xmin_epoch,
147
+ )
148
+ . await
149
+ }
150
+ }
151
+
152
+ impl Stream for LogicalReplicationStream {
153
+ type Item = Result < ReplicationMessage < LogicalReplicationMessage > , Error > ;
154
+
155
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
156
+ let this = self . project ( ) ;
157
+
158
+ match ready ! ( this. stream. poll_next( cx) ) {
159
+ Some ( Ok ( ReplicationMessage :: XLogData ( body) ) ) => {
160
+ let body = body
161
+ . map_data ( |buf| LogicalReplicationMessage :: parse ( & buf) )
162
+ . map_err ( Error :: parse) ?;
163
+ Poll :: Ready ( Some ( Ok ( ReplicationMessage :: XLogData ( body) ) ) )
164
+ }
165
+ Some ( Ok ( ReplicationMessage :: PrimaryKeepAlive ( body) ) ) => {
166
+ Poll :: Ready ( Some ( Ok ( ReplicationMessage :: PrimaryKeepAlive ( body) ) ) )
167
+ }
168
+ Some ( Ok ( _) ) => Poll :: Ready ( Some ( Err ( Error :: unexpected_message ( ) ) ) ) ,
169
+ Some ( Err ( err) ) => Poll :: Ready ( Some ( Err ( err) ) ) ,
170
+ None => Poll :: Ready ( None ) ,
171
+ }
172
+ }
173
+ }
0 commit comments