Skip to content

Commit a86b0cc

Browse files
committed
replication: fix documentation tests
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent 83e0067 commit a86b0cc

File tree

1 file changed

+16
-15
lines changed

1 file changed

+16
-15
lines changed

tokio-postgres/src/replication_client.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
//!
1515
//! # Physical Replication Client Example
1616
//! ```no_run
17-
//! extern crate tokio;
18-
//!
1917
//! use postgres_protocol::message::backend::ReplicationMessage;
20-
//! use tokio::stream::StreamExt;
18+
//! use futures::StreamExt;
2119
//! use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode};
2220
//!
2321
//! #[tokio::main]
@@ -61,11 +59,10 @@
6159
//! extension](https://github.com/eulerto/wal2json).
6260
//!
6361
//! ```no_run
64-
//! extern crate tokio;
65-
//!
6662
//! use postgres_protocol::message::backend::ReplicationMessage;
67-
//! use tokio::stream::StreamExt;
63+
//! use futures::StreamExt;
6864
//! use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode};
65+
//! use postgres_protocol::replication::pgoutput::{PgOutput, LogicalReplicationMessage};
6966
//!
7067
//! #[tokio::main]
7168
//! async fn main() -> Result<(), Error> {
@@ -85,23 +82,27 @@
8582
//! let identify_system = rclient.identify_system().await?;
8683
//!
8784
//! let slot = "my_slot";
88-
//! let plugin = "wal2json";
89-
//! let options = &vec![("pretty-print", "1")];
85+
//! let plugin = PgOutput::new(vec!["publication_name".to_string()]);
9086
//!
9187
//! let _slotdesc = rclient
92-
//! .create_logical_replication_slot(slot, false, plugin, None)
88+
//! .create_logical_replication_slot(slot, false, &plugin, None)
9389
//! .await?;
9490
//!
95-
//! let mut physical_stream = rclient
96-
//! .start_logical_replication(slot, identify_system.xlogpos(), options)
91+
//! let mut logical_stream = rclient
92+
//! .start_logical_replication(slot, identify_system.xlogpos(), &plugin)
9793
//! .await?;
9894
//!
99-
//! while let Some(replication_message) = physical_stream.next().await {
95+
//! while let Some(replication_message) = logical_stream.next().await {
10096
//! match replication_message? {
10197
//! ReplicationMessage::XLogData(xlog_data) => {
98+
//! use LogicalReplicationMessage::*;
10299
//! eprintln!("received XLogData: {:#?}", xlog_data);
103-
//! let json = std::str::from_utf8(xlog_data.data()).unwrap();
104-
//! eprintln!("JSON text: {}", json);
100+
//! match xlog_data.data() {
101+
//! Insert(insert) => eprintln!("insert event: {:?}", insert),
102+
//! Update(update) => eprintln!("update event: {:?}", update),
103+
//! Delete(delete) => eprintln!("delete event: {:?}", delete),
104+
//! event => eprintln!("other {:?}", event),
105+
//! }
105106
//! }
106107
//! ReplicationMessage::PrimaryKeepAlive(keepalive) => {
107108
//! eprintln!("received PrimaryKeepAlive: {:#?}", keepalive);
@@ -739,7 +740,7 @@ impl ReplicationClient {
739740
/// and data messages will be in
740741
/// [CopyData](postgres_protocol::message::backend::Message::CopyData).
741742
///
742-
/// Intended to be used with the [next()](tokio::stream::StreamExt::next) method.
743+
/// Intended to be used with the [next()](futures::StreamExt::next) method.
743744
///
744745
/// If the timeline specified with
745746
/// [start_physical_replication()](ReplicationClient::start_physical_replication)

0 commit comments

Comments
 (0)