1
+ use futures:: StreamExt ;
1
2
use postgres_protocol:: message:: backend:: ReplicationMessage ;
2
- use tokio:: stream:: StreamExt ;
3
+ use postgres_protocol:: replication:: pgoutput:: { LogicalReplicationMessage , PgOutput } ;
4
+ use postgres_protocol:: replication:: DecodingPlugin ;
3
5
use tokio_postgres:: replication_client:: ReplicationClient ;
4
6
use tokio_postgres:: Client ;
5
7
use tokio_postgres:: { connect, connect_replication, NoTls , ReplicationMode } ;
6
8
7
- const LOGICAL_BEGIN_TAG : u8 = b'B' ;
8
- const LOGICAL_COMMIT_TAG : u8 = b'C' ;
9
- const LOGICAL_INSERT_TAG : u8 = b'I' ;
10
-
11
9
// Tests missing for timeline_history(). For a timeline history to be
12
10
// available, it requires a point-in-time-recovery or a standby
13
11
// promotion; neither of which is done in the current test setup.
@@ -89,24 +87,20 @@ async fn logical_replication() {
89
87
assert_eq ! ( identify_system. dbname( ) . unwrap( ) , "postgres" ) ;
90
88
91
89
let slot = "test_logical_slot" ;
92
- let plugin = "pgoutput" ;
90
+ let plugin = PgOutput :: new ( vec ! [ "test_logical_pub" . to_string ( ) ] ) ;
93
91
let _ = rclient. drop_replication_slot ( slot, false ) . await . unwrap ( ) ;
94
92
let slotdesc = rclient
95
- . create_logical_replication_slot ( slot, false , plugin, None )
93
+ . create_logical_replication_slot ( slot, false , & plugin, None )
96
94
. await
97
95
. unwrap ( ) ;
98
96
assert_eq ! ( slotdesc. slot_name( ) , slot) ;
99
97
assert ! ( slotdesc. snapshot_name( ) . is_some( ) ) ;
100
- assert_eq ! ( slotdesc. output_plugin( ) , Some ( plugin) ) ;
98
+ assert_eq ! ( slotdesc. output_plugin( ) , Some ( plugin. name ( ) ) ) ;
101
99
102
100
let xlog_start = identify_system. xlogpos ( ) ;
103
- let options = & vec ! [
104
- ( "proto_version" , "1" ) ,
105
- ( "publication_names" , "test_logical_pub" ) ,
106
- ] ;
107
101
108
102
let mut logical_stream = rclient
109
- . start_logical_replication ( slot, xlog_start, options )
103
+ . start_logical_replication ( slot, xlog_start, & plugin )
110
104
. await
111
105
. unwrap ( ) ;
112
106
@@ -120,20 +114,21 @@ async fn logical_replication() {
120
114
let mut got_commit = false ;
121
115
while let Some ( replication_message) = logical_stream. next ( ) . await {
122
116
if let ReplicationMessage :: XLogData ( msg) = replication_message. unwrap ( ) {
123
- match msg. data ( ) [ 0 ] {
124
- LOGICAL_BEGIN_TAG => {
117
+ use LogicalReplicationMessage :: * ;
118
+ match msg. data ( ) {
119
+ Begin ( _) => {
125
120
assert ! ( !got_begin) ;
126
121
assert ! ( !got_insert) ;
127
122
assert ! ( !got_commit) ;
128
123
got_begin = true ;
129
124
}
130
- LOGICAL_INSERT_TAG => {
125
+ Insert ( _ ) => {
131
126
assert ! ( got_begin) ;
132
127
assert ! ( !got_insert) ;
133
128
assert ! ( !got_commit) ;
134
129
got_insert = true ;
135
130
}
136
- LOGICAL_COMMIT_TAG => {
131
+ Commit ( _ ) => {
137
132
assert ! ( got_begin) ;
138
133
assert ! ( got_insert) ;
139
134
assert ! ( !got_commit) ;
@@ -153,10 +148,6 @@ async fn logical_replication() {
153
148
simple_exec ( & sclient, "drop publication if exists test_logical_pub" ) . await ;
154
149
}
155
150
156
- // test for base backup
157
- #[ tokio:: test]
158
- async fn base_backup ( ) { }
159
-
160
151
// Test that a dropped replication stream properly returns to normal
161
152
// command processing in the ReplicationClient.
162
153
//
0 commit comments