Skip to content

Commit f3ab8b9

Browse files
jeff-davispetrosagg
andcommitted
Connection string config for replication.
Co-authored-by: Petros Angelatos <petrosagg@gmail.com>
1 parent b5cba2f commit f3ab8b9

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

tokio-postgres/src/config.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,21 @@ pub enum ChannelBinding {
5656
Require,
5757
}
5858

59+
/// Replication mode configuration.
60+
///
61+
/// It is recommended that you use a PostgreSQL server patch version
62+
/// of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or
63+
/// 9.5.25. Earlier patch levels have a bug that doesn't properly
64+
/// handle pipelined requests after streaming has stopped.
65+
#[derive(Debug, Copy, Clone, PartialEq)]
66+
#[non_exhaustive]
67+
pub enum ReplicationMode {
68+
/// Physical replication.
69+
Physical,
70+
/// Logical replication.
71+
Logical,
72+
}
73+
5974
/// A host specification.
6075
#[derive(Debug, Clone, PartialEq)]
6176
pub enum Host {
@@ -159,6 +174,7 @@ pub struct Config {
159174
pub(crate) keepalives_idle: Duration,
160175
pub(crate) target_session_attrs: TargetSessionAttrs,
161176
pub(crate) channel_binding: ChannelBinding,
177+
pub(crate) replication_mode: Option<ReplicationMode>,
162178
}
163179

164180
impl Default for Config {
@@ -184,6 +200,7 @@ impl Config {
184200
keepalives_idle: Duration::from_secs(2 * 60 * 60),
185201
target_session_attrs: TargetSessionAttrs::Any,
186202
channel_binding: ChannelBinding::Prefer,
203+
replication_mode: None,
187204
}
188205
}
189206

@@ -387,6 +404,22 @@ impl Config {
387404
self.channel_binding
388405
}
389406

407+
/// Set replication mode.
408+
///
409+
/// It is recommended that you use a PostgreSQL server patch version
410+
/// of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or
411+
/// 9.5.25. Earlier patch levels have a bug that doesn't properly
412+
/// handle pipelined requests after streaming has stopped.
413+
pub fn replication_mode(&mut self, replication_mode: ReplicationMode) -> &mut Config {
414+
self.replication_mode = Some(replication_mode);
415+
self
416+
}
417+
418+
/// Get replication mode.
419+
pub fn get_replication_mode(&self) -> Option<ReplicationMode> {
420+
self.replication_mode
421+
}
422+
390423
fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
391424
match key {
392425
"user" => {
@@ -476,6 +509,17 @@ impl Config {
476509
};
477510
self.channel_binding(channel_binding);
478511
}
512+
"replication" => {
513+
let mode = match value {
514+
"off" => None,
515+
"true" => Some(ReplicationMode::Physical),
516+
"database" => Some(ReplicationMode::Logical),
517+
_ => return Err(Error::config_parse(Box::new(InvalidValue("replication")))),
518+
};
519+
if let Some(mode) = mode {
520+
self.replication_mode(mode);
521+
}
522+
}
479523
key => {
480524
return Err(Error::config_parse(Box::new(UnknownOption(
481525
key.to_string(),
@@ -548,6 +592,7 @@ impl fmt::Debug for Config {
548592
.field("keepalives_idle", &self.keepalives_idle)
549593
.field("target_session_attrs", &self.target_session_attrs)
550594
.field("channel_binding", &self.channel_binding)
595+
.field("replication", &self.replication_mode)
551596
.finish()
552597
}
553598
}

tokio-postgres/src/connect_raw.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
2-
use crate::config::{self, Config};
2+
use crate::config::{self, Config, ReplicationMode};
33
use crate::connect_tls::connect_tls;
44
use crate::maybe_tls_stream::MaybeTlsStream;
55
use crate::tls::{TlsConnect, TlsStream};
@@ -124,6 +124,12 @@ where
124124
if let Some(application_name) = &config.application_name {
125125
params.push(("application_name", &**application_name));
126126
}
127+
if let Some(replication_mode) = &config.replication_mode {
128+
match replication_mode {
129+
ReplicationMode::Physical => params.push(("replication", "true")),
130+
ReplicationMode::Logical => params.push(("replication", "database")),
131+
}
132+
}
127133

128134
let mut buf = BytesMut::new();
129135
frontend::startup_message(params, &mut buf).map_err(Error::encode)?;

0 commit comments

Comments
 (0)