Skip to content

Commit f513799

Browse files
committed
add postgres replication integration test
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent a38781a commit f513799

File tree

2 files changed

+133
-0
lines changed

2 files changed

+133
-0
lines changed

tokio-postgres/tests/test/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tokio_postgres::{
1818

1919
mod binary_copy;
2020
mod parse;
21+
mod replication;
2122
#[cfg(feature = "runtime")]
2223
mod runtime;
2324
mod types;
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
#![allow(unused_imports)]
2+
use futures::StreamExt;
3+
4+
use postgres_protocol::message::backend::LogicalReplicationMessage::{Begin, Commit, Insert};
5+
use postgres_protocol::message::backend::ReplicationMessage::*;
6+
use postgres_protocol::message::backend::TupleData;
7+
use tokio_postgres::replication::LogicalReplicationStream;
8+
use tokio_postgres::NoTls;
9+
use tokio_postgres::SimpleQueryMessage::Row;
10+
11+
#[tokio::test]
12+
async fn test_replication() {
13+
// form SQL connection
14+
let conninfo = "host=127.0.0.1 port=5433 user=postgres replication=database";
15+
let (client, connection) = tokio_postgres::connect(conninfo, NoTls).await.unwrap();
16+
tokio::spawn(async move {
17+
if let Err(e) = connection.await {
18+
eprintln!("connection error: {}", e);
19+
}
20+
});
21+
22+
client
23+
.simple_query("DROP TABLE IF EXISTS test_logical_replication")
24+
.await
25+
.unwrap();
26+
client
27+
.simple_query("CREATE TABLE test_logical_replication(i int)")
28+
.await
29+
.unwrap();
30+
let res = client
31+
.simple_query("SELECT 'test_logical_replication'::regclass::oid")
32+
.await
33+
.unwrap();
34+
let rel_id: u32 = if let Row(row) = &res[0] {
35+
row.get("oid").unwrap().parse().unwrap()
36+
} else {
37+
panic!("unexpeced query message");
38+
};
39+
40+
client
41+
.simple_query("DROP PUBLICATION IF EXISTS test_pub")
42+
.await
43+
.unwrap();
44+
client
45+
.simple_query("CREATE PUBLICATION test_pub FOR ALL TABLES")
46+
.await
47+
.unwrap();
48+
49+
let slot = "test_logical_slot";
50+
51+
let system = client.simple_query("IDENTIFY_SYSTEM").await.unwrap();
52+
let lsn = if let Row(row) = &system[0] {
53+
row.get("xlogpos").unwrap()
54+
} else {
55+
panic!("unexpeced query message");
56+
};
57+
58+
let query = format!(
59+
r#"CREATE_REPLICATION_SLOT {:?} TEMPORARY LOGICAL "pgoutput""#,
60+
slot
61+
);
62+
client.simple_query(&query).await.unwrap();
63+
64+
// issue a query that will appear in the slot's stream since it happened after its creation
65+
client
66+
.simple_query("INSERT INTO test_logical_replication VALUES (42)")
67+
.await
68+
.unwrap();
69+
70+
let options = r#"("proto_version" '1', "publication_names" 'test_pub')"#;
71+
let query = format!(
72+
r#"START_REPLICATION SLOT {:?} LOGICAL {} {}"#,
73+
slot, lsn, options
74+
);
75+
let copy_stream = client
76+
.copy_both_simple::<bytes::Bytes>(&query)
77+
.await
78+
.unwrap();
79+
80+
let stream = LogicalReplicationStream::new(copy_stream);
81+
tokio::pin!(stream);
82+
83+
// verify that we can observe the transaction in the replication stream
84+
let begin = loop {
85+
match stream.next().await {
86+
Some(Ok(XLogData(body))) => {
87+
if let Begin(begin) = body.into_data() {
88+
break begin;
89+
}
90+
}
91+
Some(Ok(_)) => (),
92+
Some(Err(_)) => panic!("unexpected replication stream error"),
93+
None => panic!("unexpected replication stream end"),
94+
}
95+
};
96+
97+
let insert = loop {
98+
match stream.next().await {
99+
Some(Ok(XLogData(body))) => {
100+
if let Insert(insert) = body.into_data() {
101+
break insert;
102+
}
103+
}
104+
Some(Ok(_)) => (),
105+
Some(Err(_)) => panic!("unexpected replication stream error"),
106+
None => panic!("unexpected replication stream end"),
107+
}
108+
};
109+
110+
let commit = loop {
111+
match stream.next().await {
112+
Some(Ok(XLogData(body))) => {
113+
if let Commit(commit) = body.into_data() {
114+
break commit;
115+
}
116+
}
117+
Some(Ok(_)) => (),
118+
Some(Err(_)) => panic!("unexpected replication stream error"),
119+
None => panic!("unexpected replication stream end"),
120+
}
121+
};
122+
123+
assert_eq!(begin.final_lsn(), commit.commit_lsn());
124+
assert_eq!(insert.rel_id(), rel_id);
125+
126+
let tuple_data = insert.tuple().tuple_data();
127+
assert_eq!(tuple_data.len(), 1);
128+
assert!(matches!(tuple_data[0], TupleData::Text(_)));
129+
if let TupleData::Text(data) = &tuple_data[0] {
130+
assert_eq!(data, &b"42"[..]);
131+
}
132+
}

0 commit comments

Comments
 (0)