Skip to content

Commit bd96437

Browse files
jeff-davispetrosagg
andcommitted
Connection string config for replication.
Co-authored-by: Petros Angelatos <petrosagg@gmail.com>
1 parent 95a3c98 commit bd96437

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

tokio-postgres/src/config.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,21 @@ pub enum LoadBalanceHosts {
7272
Random,
7373
}
7474

75+
/// Replication mode configuration.
76+
///
77+
/// It is recommended that you use a PostgreSQL server patch version
78+
/// of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or
79+
/// 9.5.25. Earlier patch levels have a bug that doesn't properly
80+
/// handle pipelined requests after streaming has stopped.
81+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
82+
#[non_exhaustive]
83+
pub enum ReplicationMode {
84+
/// Physical replication.
85+
Physical,
86+
/// Logical replication.
87+
Logical,
88+
}
89+
7590
/// A host specification.
7691
#[derive(Debug, Clone, PartialEq, Eq)]
7792
pub enum Host {
@@ -209,6 +224,7 @@ pub struct Config {
209224
pub(crate) target_session_attrs: TargetSessionAttrs,
210225
pub(crate) channel_binding: ChannelBinding,
211226
pub(crate) load_balance_hosts: LoadBalanceHosts,
227+
pub(crate) replication_mode: Option<ReplicationMode>,
212228
}
213229

214230
impl Default for Config {
@@ -242,6 +258,7 @@ impl Config {
242258
target_session_attrs: TargetSessionAttrs::Any,
243259
channel_binding: ChannelBinding::Prefer,
244260
load_balance_hosts: LoadBalanceHosts::Disable,
261+
replication_mode: None,
245262
}
246263
}
247264

@@ -524,6 +541,22 @@ impl Config {
524541
self.load_balance_hosts
525542
}
526543

544+
/// Set replication mode.
545+
///
546+
/// It is recommended that you use a PostgreSQL server patch version
547+
/// of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or
548+
/// 9.5.25. Earlier patch levels have a bug that doesn't properly
549+
/// handle pipelined requests after streaming has stopped.
550+
pub fn replication_mode(&mut self, replication_mode: ReplicationMode) -> &mut Config {
551+
self.replication_mode = Some(replication_mode);
552+
self
553+
}
554+
555+
/// Get replication mode.
556+
pub fn get_replication_mode(&self) -> Option<ReplicationMode> {
557+
self.replication_mode
558+
}
559+
527560
fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
528561
match key {
529562
"user" => {
@@ -660,6 +693,17 @@ impl Config {
660693
};
661694
self.load_balance_hosts(load_balance_hosts);
662695
}
696+
"replication" => {
697+
let mode = match value {
698+
"off" => None,
699+
"true" => Some(ReplicationMode::Physical),
700+
"database" => Some(ReplicationMode::Logical),
701+
_ => return Err(Error::config_parse(Box::new(InvalidValue("replication")))),
702+
};
703+
if let Some(mode) = mode {
704+
self.replication_mode(mode);
705+
}
706+
}
663707
key => {
664708
return Err(Error::config_parse(Box::new(UnknownOption(
665709
key.to_string(),
@@ -744,6 +788,7 @@ impl fmt::Debug for Config {
744788
config_dbg
745789
.field("target_session_attrs", &self.target_session_attrs)
746790
.field("channel_binding", &self.channel_binding)
791+
.field("replication", &self.replication_mode)
747792
.finish()
748793
}
749794
}

tokio-postgres/src/connect_raw.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
2-
use crate::config::{self, Config};
2+
use crate::config::{self, Config, ReplicationMode};
33
use crate::connect_tls::connect_tls;
44
use crate::maybe_tls_stream::MaybeTlsStream;
55
use crate::tls::{TlsConnect, TlsStream};
@@ -133,6 +133,12 @@ where
133133
if let Some(application_name) = &config.application_name {
134134
params.push(("application_name", &**application_name));
135135
}
136+
if let Some(replication_mode) = &config.replication_mode {
137+
match replication_mode {
138+
ReplicationMode::Physical => params.push(("replication", "true")),
139+
ReplicationMode::Logical => params.push(("replication", "database")),
140+
}
141+
}
136142

137143
let mut buf = BytesMut::new();
138144
frontend::startup_message(params, &mut buf).map_err(Error::encode)?;

0 commit comments

Comments
 (0)