Skip to content

Commit e7c0b58

Browse files
authored
Merge pull request #90 from psqlpy-python/feature/add_synchronous_commit_option
Added synchronous_commit option for transactions
2 parents 969aadc + ff53fa7 commit e7c0b58

File tree

7 files changed

+188
-29
lines changed

7 files changed

+188
-29
lines changed

python/psqlpy/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
ReadVariant,
1212
SingleQueryResult,
1313
SslMode,
14+
SynchronousCommit,
1415
TargetSessionAttrs,
1516
Transaction,
1617
connect,
@@ -32,4 +33,5 @@
3233
"SslMode",
3334
"KeepaliveConfig",
3435
"ConnectionPoolBuilder",
36+
"SynchronousCommit",
3537
]

python/psqlpy/_internal/__init__.pyi

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,38 @@ class SingleQueryResult:
152152
Type that return passed function.
153153
"""
154154

155+
class SynchronousCommit(Enum):
156+
"""
157+
Class for synchronous_commit option for transactions.
158+
159+
### Variants:
160+
- `On`: The meaning may change based on whether you have
161+
a synchronous standby or not.
162+
If there is a synchronous standby,
163+
setting the value to on will result in waiting till “remote flush”.
164+
- `Off`: As the name indicates, the commit acknowledgment can come before
165+
flushing the records to disk.
166+
This is generally called as an asynchronous commit.
167+
If the PostgreSQL instance crashes,
168+
the last few asynchronous commits might be lost.
169+
- `Local`: WAL records are written and flushed to local disks.
170+
In this case, the commit will be acknowledged after the
171+
local WAL Write and WAL flush completes.
172+
- `RemoteWrite`: WAL records are successfully handed over to
173+
remote instances which acknowledged back
174+
about the write (not flush).
175+
- `RemoteApply`: This will result in commits waiting until replies from the
176+
current synchronous standby(s) indicate they have received
177+
the commit record of the transaction and applied it so
178+
that it has become visible to queries on the standby(s).
179+
"""
180+
181+
On = 1
182+
Off = 2
183+
Local = 3
184+
RemoteWrite = 4
185+
RemoteApply = 5
186+
155187
class IsolationLevel(Enum):
156188
"""Class for Isolation Level for transactions."""
157189

@@ -1050,13 +1082,15 @@ class Connection:
10501082
isolation_level: IsolationLevel | None = None,
10511083
read_variant: ReadVariant | None = None,
10521084
deferrable: bool | None = None,
1085+
synchronous_commit: SynchronousCommit | None = None,
10531086
) -> Transaction:
10541087
"""Create new transaction.
10551088
10561089
### Parameters:
10571090
- `isolation_level`: configure isolation level of the transaction.
10581091
- `read_variant`: configure read variant of the transaction.
10591092
- `deferrable`: configure deferrable of the transaction.
1093+
- `synchronous_commit`: configure synchronous_commit option for transaction.
10601094
"""
10611095
def cursor(
10621096
self: Self,

python/tests/test_transaction.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,13 @@
99
from pyarrow import parquet
1010
from tests.helpers import count_rows_in_test_table
1111

12-
from psqlpy import ConnectionPool, Cursor, IsolationLevel, ReadVariant
12+
from psqlpy import (
13+
ConnectionPool,
14+
Cursor,
15+
IsolationLevel,
16+
ReadVariant,
17+
SynchronousCommit,
18+
)
1319
from psqlpy.exceptions import (
1420
RustPSQLDriverPyBaseError,
1521
TransactionBeginError,
@@ -401,3 +407,27 @@ async def test_execute_batch_method(psql_pool: ConnectionPool) -> None:
401407
await transaction.execute_batch(querystring=query)
402408
await transaction.execute(querystring="SELECT * FROM execute_batch")
403409
await transaction.execute(querystring="SELECT * FROM execute_batch2")
410+
411+
412+
@pytest.mark.parametrize(
413+
"synchronous_commit",
414+
(
415+
SynchronousCommit.On,
416+
SynchronousCommit.Off,
417+
SynchronousCommit.Local,
418+
SynchronousCommit.RemoteWrite,
419+
SynchronousCommit.RemoteApply,
420+
),
421+
)
422+
async def test_synchronous_commit(
423+
synchronous_commit: SynchronousCommit,
424+
psql_pool: ConnectionPool,
425+
table_name: str,
426+
number_database_records: int,
427+
) -> None:
428+
async with psql_pool.acquire() as conn, conn.transaction(synchronous_commit=synchronous_commit) as trans:
429+
res = await trans.execute(
430+
f"SELECT * FROM {table_name}",
431+
)
432+
433+
assert len(res.result()) == number_database_records

src/driver/connection.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::{
1616
use super::{
1717
cursor::Cursor,
1818
transaction::Transaction,
19-
transaction_options::{IsolationLevel, ReadVariant},
19+
transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit},
2020
};
2121

2222
/// Format OPTS parameter for Postgres COPY command.
@@ -594,13 +594,15 @@ impl Connection {
594594
isolation_level: Option<IsolationLevel>,
595595
read_variant: Option<ReadVariant>,
596596
deferrable: Option<bool>,
597+
synchronous_commit: Option<SynchronousCommit>,
597598
) -> RustPSQLDriverPyResult<Transaction> {
598599
if let Some(db_client) = &self.db_client {
599600
return Ok(Transaction::new(
600601
db_client.clone(),
601602
false,
602603
false,
603604
isolation_level,
605+
synchronous_commit,
604606
read_variant,
605607
deferrable,
606608
HashSet::new(),

src/driver/transaction.rs

Lines changed: 74 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::{
1818

1919
use super::{
2020
cursor::Cursor,
21-
transaction_options::{IsolationLevel, ReadVariant},
21+
transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit},
2222
};
2323
use crate::common::ObjectQueryTrait;
2424
use std::{collections::HashSet, sync::Arc};
@@ -30,6 +30,7 @@ pub trait TransactionObjectTrait {
3030
isolation_level: Option<IsolationLevel>,
3131
read_variant: Option<ReadVariant>,
3232
defferable: Option<bool>,
33+
synchronous_commit: Option<SynchronousCommit>,
3334
) -> impl std::future::Future<Output = RustPSQLDriverPyResult<()>> + Send;
3435
fn commit(&self) -> impl std::future::Future<Output = RustPSQLDriverPyResult<()>> + Send;
3536
fn rollback(&self) -> impl std::future::Future<Output = RustPSQLDriverPyResult<()>> + Send;
@@ -41,6 +42,7 @@ impl TransactionObjectTrait for Object {
4142
isolation_level: Option<IsolationLevel>,
4243
read_variant: Option<ReadVariant>,
4344
deferrable: Option<bool>,
45+
synchronous_commit: Option<SynchronousCommit>,
4446
) -> RustPSQLDriverPyResult<()> {
4547
let mut querystring = "START TRANSACTION".to_string();
4648

@@ -60,12 +62,28 @@ impl TransactionObjectTrait for Object {
6062
Some(false) => " NOT DEFERRABLE",
6163
None => "",
6264
});
65+
6366
self.batch_execute(&querystring).await.map_err(|err| {
6467
RustPSQLDriverError::TransactionBeginError(format!(
6568
"Cannot execute statement to start transaction, err - {err}"
6669
))
6770
})?;
6871

72+
if let Some(synchronous_commit) = synchronous_commit {
73+
let str_synchronous_commit = synchronous_commit.to_str_level();
74+
75+
let synchronous_commit_query =
76+
format!("SET LOCAL synchronous_commit = '{str_synchronous_commit}'");
77+
78+
self.batch_execute(&synchronous_commit_query)
79+
.await
80+
.map_err(|err| {
81+
RustPSQLDriverError::TransactionBeginError(format!(
82+
"Cannot set synchronous_commit parameter, err - {err}"
83+
))
84+
})?;
85+
}
86+
6987
Ok(())
7088
}
7189
async fn commit(&self) -> RustPSQLDriverPyResult<()> {
@@ -93,6 +111,7 @@ pub struct Transaction {
93111
is_done: bool,
94112

95113
isolation_level: Option<IsolationLevel>,
114+
synchronous_commit: Option<SynchronousCommit>,
96115
read_variant: Option<ReadVariant>,
97116
deferrable: Option<bool>,
98117

@@ -107,6 +126,7 @@ impl Transaction {
107126
is_started: bool,
108127
is_done: bool,
109128
isolation_level: Option<IsolationLevel>,
129+
synchronous_commit: Option<SynchronousCommit>,
110130
read_variant: Option<ReadVariant>,
111131
deferrable: Option<bool>,
112132
savepoints_map: HashSet<String>,
@@ -116,6 +136,7 @@ impl Transaction {
116136
is_started,
117137
is_done,
118138
isolation_level,
139+
synchronous_commit,
119140
read_variant,
120141
deferrable,
121142
savepoints_map,
@@ -149,18 +170,26 @@ impl Transaction {
149170
}
150171

151172
async fn __aenter__<'a>(self_: Py<Self>) -> RustPSQLDriverPyResult<Py<Self>> {
152-
let (is_started, is_done, isolation_level, read_variant, deferrable, db_client) =
153-
pyo3::Python::with_gil(|gil| {
154-
let self_ = self_.borrow(gil);
155-
(
156-
self_.is_started,
157-
self_.is_done,
158-
self_.isolation_level,
159-
self_.read_variant,
160-
self_.deferrable,
161-
self_.db_client.clone(),
162-
)
163-
});
173+
let (
174+
is_started,
175+
is_done,
176+
isolation_level,
177+
synchronous_commit,
178+
read_variant,
179+
deferrable,
180+
db_client,
181+
) = pyo3::Python::with_gil(|gil| {
182+
let self_ = self_.borrow(gil);
183+
(
184+
self_.is_started,
185+
self_.is_done,
186+
self_.isolation_level,
187+
self_.synchronous_commit,
188+
self_.read_variant,
189+
self_.deferrable,
190+
self_.db_client.clone(),
191+
)
192+
});
164193

165194
if is_started {
166195
return Err(RustPSQLDriverError::TransactionBeginError(
@@ -176,7 +205,12 @@ impl Transaction {
176205

177206
if let Some(db_client) = db_client {
178207
db_client
179-
.start_transaction(isolation_level, read_variant, deferrable)
208+
.start_transaction(
209+
isolation_level,
210+
read_variant,
211+
deferrable,
212+
synchronous_commit,
213+
)
180214
.await?;
181215

182216
Python::with_gil(|gil| {
@@ -558,18 +592,26 @@ impl Transaction {
558592
/// 2) Transaction is done.
559593
/// 3) Cannot execute `BEGIN` command.
560594
pub async fn begin(self_: Py<Self>) -> RustPSQLDriverPyResult<()> {
561-
let (is_started, is_done, isolation_level, read_variant, deferrable, db_client) =
562-
pyo3::Python::with_gil(|gil| {
563-
let self_ = self_.borrow(gil);
564-
(
565-
self_.is_started,
566-
self_.is_done,
567-
self_.isolation_level,
568-
self_.read_variant,
569-
self_.deferrable,
570-
self_.db_client.clone(),
571-
)
572-
});
595+
let (
596+
is_started,
597+
is_done,
598+
isolation_level,
599+
synchronous_commit,
600+
read_variant,
601+
deferrable,
602+
db_client,
603+
) = pyo3::Python::with_gil(|gil| {
604+
let self_ = self_.borrow(gil);
605+
(
606+
self_.is_started,
607+
self_.is_done,
608+
self_.isolation_level,
609+
self_.synchronous_commit,
610+
self_.read_variant,
611+
self_.deferrable,
612+
self_.db_client.clone(),
613+
)
614+
});
573615

574616
if let Some(db_client) = db_client {
575617
if is_started {
@@ -584,7 +626,12 @@ impl Transaction {
584626
));
585627
}
586628
db_client
587-
.start_transaction(isolation_level, read_variant, deferrable)
629+
.start_transaction(
630+
isolation_level,
631+
read_variant,
632+
deferrable,
633+
synchronous_commit,
634+
)
588635
.await?;
589636

590637
pyo3::Python::with_gil(|gil| {

src/driver/transaction_options.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,46 @@ pub enum ReadVariant {
2828
ReadOnly,
2929
ReadWrite,
3030
}
31+
32+
#[pyclass]
33+
#[derive(Clone, Copy)]
34+
pub enum SynchronousCommit {
35+
/// As the name indicates, the commit acknowledgment can come before
36+
/// flushing the records to disk.
37+
/// This is generally called as an asynchronous commit.
38+
/// If the PostgreSQL instance crashes,
39+
/// the last few asynchronous commits might be lost.
40+
Off,
41+
/// WAL records are written and flushed to local disks.
42+
/// In this case, the commit will be acknowledged after the
43+
/// local WAL Write and WAL flush completes.
44+
Local,
45+
/// WAL records are successfully handed over to
46+
/// remote instances which acknowledged back
47+
/// about the write (not flush).
48+
RemoteWrite,
49+
/// The meaning may change based on whether you have
50+
/// a synchronous standby or not.
51+
/// If there is a synchronous standby,
52+
/// setting the value to on will result in waiting till “remote flush”.
53+
On,
54+
/// This will result in commits waiting until replies from the
55+
/// current synchronous standby(s) indicate they have received
56+
/// the commit record of the transaction and applied it so
57+
/// that it has become visible to queries on the standby(s).
58+
RemoteApply,
59+
}
60+
61+
impl SynchronousCommit {
62+
/// Return isolation level as String literal.
63+
#[must_use]
64+
pub fn to_str_level(&self) -> String {
65+
match self {
66+
SynchronousCommit::Off => "off".into(),
67+
SynchronousCommit::Local => "local".into(),
68+
SynchronousCommit::RemoteWrite => "remote_write".into(),
69+
SynchronousCommit::On => "on".into(),
70+
SynchronousCommit::RemoteApply => "remote_apply".into(),
71+
}
72+
}
73+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ fn psqlpy(py: Python<'_>, pymod: &Bound<'_, PyModule>) -> PyResult<()> {
2626
pymod.add_class::<driver::transaction::Transaction>()?;
2727
pymod.add_class::<driver::cursor::Cursor>()?;
2828
pymod.add_class::<driver::transaction_options::IsolationLevel>()?;
29+
pymod.add_class::<driver::transaction_options::SynchronousCommit>()?;
2930
pymod.add_class::<driver::transaction_options::ReadVariant>()?;
3031
pymod.add_class::<driver::common_options::ConnRecyclingMethod>()?;
3132
pymod.add_class::<driver::common_options::LoadBalanceHosts>()?;

0 commit comments

Comments
 (0)