Skip to content

Commit 7258f94

Browse files
committed
replication: fix integration tests
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent dbf89c4 commit 7258f94

File tree

1 file changed

+12
-21
lines changed

1 file changed

+12
-21
lines changed

tokio-postgres/tests/test/replication.rs

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1+
use futures::StreamExt;
12
use postgres_protocol::message::backend::ReplicationMessage;
2-
use tokio::stream::StreamExt;
3+
use postgres_protocol::replication::pgoutput::{LogicalReplicationMessage, PgOutput};
4+
use postgres_protocol::replication::DecodingPlugin;
35
use tokio_postgres::replication_client::ReplicationClient;
46
use tokio_postgres::Client;
57
use tokio_postgres::{connect, connect_replication, NoTls, ReplicationMode};
68

7-
const LOGICAL_BEGIN_TAG: u8 = b'B';
8-
const LOGICAL_COMMIT_TAG: u8 = b'C';
9-
const LOGICAL_INSERT_TAG: u8 = b'I';
10-
119
// Tests missing for timeline_history(). For a timeline history to be
1210
// available, it requires a point-in-time-recovery or a standby
1311
// promotion; neither of which is done in the current test setup.
@@ -89,24 +87,20 @@ async fn logical_replication() {
8987
assert_eq!(identify_system.dbname().unwrap(), "postgres");
9088

9189
let slot = "test_logical_slot";
92-
let plugin = "pgoutput";
90+
let plugin = PgOutput::new(vec!["test_logical_pub".to_string()]);
9391
let _ = rclient.drop_replication_slot(slot, false).await.unwrap();
9492
let slotdesc = rclient
95-
.create_logical_replication_slot(slot, false, plugin, None)
93+
.create_logical_replication_slot(slot, false, &plugin, None)
9694
.await
9795
.unwrap();
9896
assert_eq!(slotdesc.slot_name(), slot);
9997
assert!(slotdesc.snapshot_name().is_some());
100-
assert_eq!(slotdesc.output_plugin(), Some(plugin));
98+
assert_eq!(slotdesc.output_plugin(), Some(plugin.name()));
10199

102100
let xlog_start = identify_system.xlogpos();
103-
let options = &vec![
104-
("proto_version", "1"),
105-
("publication_names", "test_logical_pub"),
106-
];
107101

108102
let mut logical_stream = rclient
109-
.start_logical_replication(slot, xlog_start, options)
103+
.start_logical_replication(slot, xlog_start, &plugin)
110104
.await
111105
.unwrap();
112106

@@ -120,20 +114,21 @@ async fn logical_replication() {
120114
let mut got_commit = false;
121115
while let Some(replication_message) = logical_stream.next().await {
122116
if let ReplicationMessage::XLogData(msg) = replication_message.unwrap() {
123-
match msg.data()[0] {
124-
LOGICAL_BEGIN_TAG => {
117+
use LogicalReplicationMessage::*;
118+
match msg.data() {
119+
Begin(_) => {
125120
assert!(!got_begin);
126121
assert!(!got_insert);
127122
assert!(!got_commit);
128123
got_begin = true;
129124
}
130-
LOGICAL_INSERT_TAG => {
125+
Insert(_) => {
131126
assert!(got_begin);
132127
assert!(!got_insert);
133128
assert!(!got_commit);
134129
got_insert = true;
135130
}
136-
LOGICAL_COMMIT_TAG => {
131+
Commit(_) => {
137132
assert!(got_begin);
138133
assert!(got_insert);
139134
assert!(!got_commit);
@@ -153,10 +148,6 @@ async fn logical_replication() {
153148
simple_exec(&sclient, "drop publication if exists test_logical_pub").await;
154149
}
155150

156-
// test for base backup
157-
#[tokio::test]
158-
async fn base_backup() {}
159-
160151
// Test that a dropped replication stream properly returns to normal
161152
// command processing in the ReplicationClient.
162153
//

0 commit comments

Comments
 (0)