Skip to content

Commit 2322e30

Browse files
committed
add postgres replication integration test
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent f192b59 commit 2322e30

File tree

2 files changed

+151
-0
lines changed

2 files changed

+151
-0
lines changed

tokio-postgres/tests/test/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tokio_postgres::{
1818

1919
mod binary_copy;
2020
mod parse;
21+
mod replication;
2122
#[cfg(feature = "runtime")]
2223
mod runtime;
2324
mod types;
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
use futures::StreamExt;
2+
use std::time::{Duration, UNIX_EPOCH};
3+
4+
use postgres_protocol::message::backend::LogicalReplicationMessage::{Begin, Commit, Insert};
5+
use postgres_protocol::message::backend::ReplicationMessage::*;
6+
use postgres_protocol::message::backend::TupleData;
7+
use postgres_types::PgLsn;
8+
use tokio_postgres::replication::LogicalReplicationStream;
9+
use tokio_postgres::NoTls;
10+
use tokio_postgres::SimpleQueryMessage::Row;
11+
12+
#[tokio::test]
13+
async fn test_replication() {
14+
// form SQL connection
15+
let conninfo = "host=127.0.0.1 port=5433 user=postgres replication=database";
16+
let (client, connection) = tokio_postgres::connect(conninfo, NoTls).await.unwrap();
17+
tokio::spawn(async move {
18+
if let Err(e) = connection.await {
19+
eprintln!("connection error: {}", e);
20+
}
21+
});
22+
23+
client
24+
.simple_query("DROP TABLE IF EXISTS test_logical_replication")
25+
.await
26+
.unwrap();
27+
client
28+
.simple_query("CREATE TABLE test_logical_replication(i int)")
29+
.await
30+
.unwrap();
31+
let res = client
32+
.simple_query("SELECT 'test_logical_replication'::regclass::oid")
33+
.await
34+
.unwrap();
35+
let rel_id: u32 = if let Row(row) = &res[0] {
36+
row.get("oid").unwrap().parse().unwrap()
37+
} else {
38+
panic!("unexpeced query message");
39+
};
40+
41+
client
42+
.simple_query("DROP PUBLICATION IF EXISTS test_pub")
43+
.await
44+
.unwrap();
45+
client
46+
.simple_query("CREATE PUBLICATION test_pub FOR ALL TABLES")
47+
.await
48+
.unwrap();
49+
50+
let slot = "test_logical_slot";
51+
52+
let query = format!(
53+
r#"CREATE_REPLICATION_SLOT {:?} TEMPORARY LOGICAL "pgoutput""#,
54+
slot
55+
);
56+
let slot_query = client.simple_query(&query).await.unwrap();
57+
let lsn = if let Row(row) = &slot_query[0] {
58+
row.get("consistent_point").unwrap()
59+
} else {
60+
panic!("unexpeced query message");
61+
};
62+
63+
// issue a query that will appear in the slot's stream since it happened after its creation
64+
client
65+
.simple_query("INSERT INTO test_logical_replication VALUES (42)")
66+
.await
67+
.unwrap();
68+
69+
let options = r#"("proto_version" '1', "publication_names" 'test_pub')"#;
70+
let query = format!(
71+
r#"START_REPLICATION SLOT {:?} LOGICAL {} {}"#,
72+
slot, lsn, options
73+
);
74+
let copy_stream = client
75+
.copy_both_simple::<bytes::Bytes>(&query)
76+
.await
77+
.unwrap();
78+
79+
let stream = LogicalReplicationStream::new(copy_stream);
80+
tokio::pin!(stream);
81+
82+
// verify that we can observe the transaction in the replication stream
83+
let begin = loop {
84+
match stream.next().await {
85+
Some(Ok(XLogData(body))) => {
86+
if let Begin(begin) = body.into_data() {
87+
break begin;
88+
}
89+
}
90+
Some(Ok(_)) => (),
91+
Some(Err(_)) => panic!("unexpected replication stream error"),
92+
None => panic!("unexpected replication stream end"),
93+
}
94+
};
95+
let insert = loop {
96+
match stream.next().await {
97+
Some(Ok(XLogData(body))) => {
98+
if let Insert(insert) = body.into_data() {
99+
break insert;
100+
}
101+
}
102+
Some(Ok(_)) => (),
103+
Some(Err(_)) => panic!("unexpected replication stream error"),
104+
None => panic!("unexpected replication stream end"),
105+
}
106+
};
107+
108+
let commit = loop {
109+
match stream.next().await {
110+
Some(Ok(XLogData(body))) => {
111+
if let Commit(commit) = body.into_data() {
112+
break commit;
113+
}
114+
}
115+
Some(Ok(_)) => (),
116+
Some(Err(_)) => panic!("unexpected replication stream error"),
117+
None => panic!("unexpected replication stream end"),
118+
}
119+
};
120+
121+
assert_eq!(begin.final_lsn(), commit.commit_lsn());
122+
assert_eq!(insert.rel_id(), rel_id);
123+
124+
let tuple_data = insert.tuple().tuple_data();
125+
assert_eq!(tuple_data.len(), 1);
126+
assert!(matches!(tuple_data[0], TupleData::Text(_)));
127+
if let TupleData::Text(data) = &tuple_data[0] {
128+
assert_eq!(data, &b"42"[..]);
129+
}
130+
131+
// Send a standby status update and require a keep alive response
132+
let lsn: PgLsn = lsn.parse().unwrap();
133+
134+
// Postgres epoch is 2000-01-01T00:00:00Z
135+
let pg_epoch = UNIX_EPOCH + Duration::from_secs(946_684_800);
136+
let ts = pg_epoch.elapsed().unwrap().as_micros() as i64;
137+
stream
138+
.as_mut()
139+
.standby_status_update(lsn, lsn, lsn, ts, 1)
140+
.await
141+
.unwrap();
142+
loop {
143+
match stream.next().await {
144+
Some(Ok(PrimaryKeepAlive(_))) => break,
145+
Some(Ok(_)) => (),
146+
Some(Err(e)) => panic!("unexpected replication stream error: {}", e),
147+
None => panic!("unexpected replication stream end"),
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)