Skip to content

Commit 44db213

Browse files
jeff-davispetrosagg
andcommitted
Connection string config for replication.
Co-authored-by: Petros Angelatos <petrosagg@gmail.com>
1 parent 5bd1d32 commit 44db213

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

tokio-postgres/src/config.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,21 @@ pub enum ChannelBinding {
5757
Require,
5858
}
5959

60+
/// Replication mode configuration.
61+
///
62+
/// It is recommended that you use a PostgreSQL server patch version
63+
/// of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or
64+
/// 9.5.25. Earlier patch levels have a bug that doesn't properly
65+
/// handle pipelined requests after streaming has stopped.
66+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
67+
#[non_exhaustive]
68+
pub enum ReplicationMode {
69+
/// Physical replication.
70+
Physical,
71+
/// Logical replication.
72+
Logical,
73+
}
74+
6075
/// A host specification.
6176
#[derive(Debug, Clone, PartialEq, Eq)]
6277
pub enum Host {
@@ -164,6 +179,7 @@ pub struct Config {
164179
pub(crate) keepalive_config: KeepaliveConfig,
165180
pub(crate) target_session_attrs: TargetSessionAttrs,
166181
pub(crate) channel_binding: ChannelBinding,
182+
pub(crate) replication_mode: Option<ReplicationMode>,
167183
}
168184

169185
impl Default for Config {
@@ -194,6 +210,7 @@ impl Config {
194210
keepalive_config,
195211
target_session_attrs: TargetSessionAttrs::Any,
196212
channel_binding: ChannelBinding::Prefer,
213+
replication_mode: None,
197214
}
198215
}
199216

@@ -424,6 +441,22 @@ impl Config {
424441
self.channel_binding
425442
}
426443

444+
/// Set replication mode.
445+
///
446+
/// It is recommended that you use a PostgreSQL server patch version
447+
/// of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or
448+
/// 9.5.25. Earlier patch levels have a bug that doesn't properly
449+
/// handle pipelined requests after streaming has stopped.
450+
pub fn replication_mode(&mut self, replication_mode: ReplicationMode) -> &mut Config {
451+
self.replication_mode = Some(replication_mode);
452+
self
453+
}
454+
455+
/// Get replication mode.
456+
pub fn get_replication_mode(&self) -> Option<ReplicationMode> {
457+
self.replication_mode
458+
}
459+
427460
fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
428461
match key {
429462
"user" => {
@@ -527,6 +560,17 @@ impl Config {
527560
};
528561
self.channel_binding(channel_binding);
529562
}
563+
"replication" => {
564+
let mode = match value {
565+
"off" => None,
566+
"true" => Some(ReplicationMode::Physical),
567+
"database" => Some(ReplicationMode::Logical),
568+
_ => return Err(Error::config_parse(Box::new(InvalidValue("replication")))),
569+
};
570+
if let Some(mode) = mode {
571+
self.replication_mode(mode);
572+
}
573+
}
530574
key => {
531575
return Err(Error::config_parse(Box::new(UnknownOption(
532576
key.to_string(),
@@ -601,6 +645,7 @@ impl fmt::Debug for Config {
601645
.field("keepalives_retries", &self.keepalive_config.retries)
602646
.field("target_session_attrs", &self.target_session_attrs)
603647
.field("channel_binding", &self.channel_binding)
648+
.field("replication", &self.replication_mode)
604649
.finish()
605650
}
606651
}

tokio-postgres/src/connect_raw.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
2-
use crate::config::{self, Config};
2+
use crate::config::{self, Config, ReplicationMode};
33
use crate::connect_tls::connect_tls;
44
use crate::maybe_tls_stream::MaybeTlsStream;
55
use crate::tls::{TlsConnect, TlsStream};
@@ -124,6 +124,12 @@ where
124124
if let Some(application_name) = &config.application_name {
125125
params.push(("application_name", &**application_name));
126126
}
127+
if let Some(replication_mode) = &config.replication_mode {
128+
match replication_mode {
129+
ReplicationMode::Physical => params.push(("replication", "true")),
130+
ReplicationMode::Logical => params.push(("replication", "database")),
131+
}
132+
}
127133

128134
let mut buf = BytesMut::new();
129135
frontend::startup_message(params, &mut buf).map_err(Error::encode)?;

0 commit comments

Comments
 (0)