diff --git a/python/psqlpy/__init__.py b/python/psqlpy/__init__.py index e4ffe06c..3eaa441b 100644 --- a/python/psqlpy/__init__.py +++ b/python/psqlpy/__init__.py @@ -11,6 +11,7 @@ ReadVariant, SingleQueryResult, SslMode, + SynchronousCommit, TargetSessionAttrs, Transaction, connect, @@ -32,4 +33,5 @@ "SslMode", "KeepaliveConfig", "ConnectionPoolBuilder", + "SynchronousCommit", ] diff --git a/python/psqlpy/_internal/__init__.pyi b/python/psqlpy/_internal/__init__.pyi index 353d9364..ce36ef2d 100644 --- a/python/psqlpy/_internal/__init__.pyi +++ b/python/psqlpy/_internal/__init__.pyi @@ -152,6 +152,38 @@ class SingleQueryResult: Type that return passed function. """ +class SynchronousCommit(Enum): + """ + Class for synchronous_commit option for transactions. + + ### Variants: + - `On`: The meaning may change based on whether you have + a synchronous standby or not. + If there is a synchronous standby, + setting the value to on will result in waiting till “remote flush”. + - `Off`: As the name indicates, the commit acknowledgment can come before + flushing the records to disk. + This is generally called as an asynchronous commit. + If the PostgreSQL instance crashes, + the last few asynchronous commits might be lost. + - `Local`: WAL records are written and flushed to local disks. + In this case, the commit will be acknowledged after the + local WAL Write and WAL flush completes. + - `RemoteWrite`: WAL records are successfully handed over to + remote instances which acknowledged back + about the write (not flush). + - `RemoteApply`: This will result in commits waiting until replies from the + current synchronous standby(s) indicate they have received + the commit record of the transaction and applied it so + that it has become visible to queries on the standby(s). + """ + + On = 1 + Off = 2 + Local = 3 + RemoteWrite = 4 + RemoteApply = 5 + class IsolationLevel(Enum): """Class for Isolation Level for transactions.""" @@ -1050,6 +1082,7 @@ class Connection: isolation_level: IsolationLevel | None = None, read_variant: ReadVariant | None = None, deferrable: bool | None = None, + synchronous_commit: SynchronousCommit | None = None, ) -> Transaction: """Create new transaction. @@ -1057,6 +1090,7 @@ class Connection: - `isolation_level`: configure isolation level of the transaction. - `read_variant`: configure read variant of the transaction. - `deferrable`: configure deferrable of the transaction. + - `synchronous_commit`: configure synchronous_commit option for transaction. """ def cursor( self: Self, diff --git a/python/tests/test_transaction.py b/python/tests/test_transaction.py index a9df2d28..b01302e4 100644 --- a/python/tests/test_transaction.py +++ b/python/tests/test_transaction.py @@ -9,7 +9,13 @@ from pyarrow import parquet from tests.helpers import count_rows_in_test_table -from psqlpy import ConnectionPool, Cursor, IsolationLevel, ReadVariant +from psqlpy import ( + ConnectionPool, + Cursor, + IsolationLevel, + ReadVariant, + SynchronousCommit, +) from psqlpy.exceptions import ( RustPSQLDriverPyBaseError, TransactionBeginError, @@ -401,3 +407,27 @@ async def test_execute_batch_method(psql_pool: ConnectionPool) -> None: await transaction.execute_batch(querystring=query) await transaction.execute(querystring="SELECT * FROM execute_batch") await transaction.execute(querystring="SELECT * FROM execute_batch2") + + +@pytest.mark.parametrize( + "synchronous_commit", + ( + SynchronousCommit.On, + SynchronousCommit.Off, + SynchronousCommit.Local, + SynchronousCommit.RemoteWrite, + SynchronousCommit.RemoteApply, + ), +) +async def test_synchronous_commit( + synchronous_commit: SynchronousCommit, + psql_pool: ConnectionPool, + table_name: str, + number_database_records: int, +) -> None: + async with psql_pool.acquire() as conn, conn.transaction(synchronous_commit=synchronous_commit) as trans: + res = await trans.execute( + f"SELECT * FROM {table_name}", + ) + + assert len(res.result()) == number_database_records diff --git a/src/driver/connection.rs b/src/driver/connection.rs index d145a592..e0df0b14 100644 --- a/src/driver/connection.rs +++ b/src/driver/connection.rs @@ -16,7 +16,7 @@ use crate::{ use super::{ cursor::Cursor, transaction::Transaction, - transaction_options::{IsolationLevel, ReadVariant}, + transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit}, }; /// Format OPTS parameter for Postgres COPY command. @@ -594,6 +594,7 @@ impl Connection { isolation_level: Option, read_variant: Option, deferrable: Option, + synchronous_commit: Option, ) -> RustPSQLDriverPyResult { if let Some(db_client) = &self.db_client { return Ok(Transaction::new( @@ -601,6 +602,7 @@ impl Connection { false, false, isolation_level, + synchronous_commit, read_variant, deferrable, HashSet::new(), diff --git a/src/driver/transaction.rs b/src/driver/transaction.rs index 535edc12..472d623d 100644 --- a/src/driver/transaction.rs +++ b/src/driver/transaction.rs @@ -18,7 +18,7 @@ use crate::{ use super::{ cursor::Cursor, - transaction_options::{IsolationLevel, ReadVariant}, + transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit}, }; use crate::common::ObjectQueryTrait; use std::{collections::HashSet, sync::Arc}; @@ -30,6 +30,7 @@ pub trait TransactionObjectTrait { isolation_level: Option, read_variant: Option, defferable: Option, + synchronous_commit: Option, ) -> impl std::future::Future> + Send; fn commit(&self) -> impl std::future::Future> + Send; fn rollback(&self) -> impl std::future::Future> + Send; @@ -41,6 +42,7 @@ impl TransactionObjectTrait for Object { isolation_level: Option, read_variant: Option, deferrable: Option, + synchronous_commit: Option, ) -> RustPSQLDriverPyResult<()> { let mut querystring = "START TRANSACTION".to_string(); @@ -60,12 +62,28 @@ impl TransactionObjectTrait for Object { Some(false) => " NOT DEFERRABLE", None => "", }); + self.batch_execute(&querystring).await.map_err(|err| { RustPSQLDriverError::TransactionBeginError(format!( "Cannot execute statement to start transaction, err - {err}" )) })?; + if let Some(synchronous_commit) = synchronous_commit { + let str_synchronous_commit = synchronous_commit.to_str_level(); + + let synchronous_commit_query = + format!("SET LOCAL synchronous_commit = '{str_synchronous_commit}'"); + + self.batch_execute(&synchronous_commit_query) + .await + .map_err(|err| { + RustPSQLDriverError::TransactionBeginError(format!( + "Cannot set synchronous_commit parameter, err - {err}" + )) + })?; + } + Ok(()) } async fn commit(&self) -> RustPSQLDriverPyResult<()> { @@ -93,6 +111,7 @@ pub struct Transaction { is_done: bool, isolation_level: Option, + synchronous_commit: Option, read_variant: Option, deferrable: Option, @@ -107,6 +126,7 @@ impl Transaction { is_started: bool, is_done: bool, isolation_level: Option, + synchronous_commit: Option, read_variant: Option, deferrable: Option, savepoints_map: HashSet, @@ -116,6 +136,7 @@ impl Transaction { is_started, is_done, isolation_level, + synchronous_commit, read_variant, deferrable, savepoints_map, @@ -149,18 +170,26 @@ impl Transaction { } async fn __aenter__<'a>(self_: Py) -> RustPSQLDriverPyResult> { - let (is_started, is_done, isolation_level, read_variant, deferrable, db_client) = - pyo3::Python::with_gil(|gil| { - let self_ = self_.borrow(gil); - ( - self_.is_started, - self_.is_done, - self_.isolation_level, - self_.read_variant, - self_.deferrable, - self_.db_client.clone(), - ) - }); + let ( + is_started, + is_done, + isolation_level, + synchronous_commit, + read_variant, + deferrable, + db_client, + ) = pyo3::Python::with_gil(|gil| { + let self_ = self_.borrow(gil); + ( + self_.is_started, + self_.is_done, + self_.isolation_level, + self_.synchronous_commit, + self_.read_variant, + self_.deferrable, + self_.db_client.clone(), + ) + }); if is_started { return Err(RustPSQLDriverError::TransactionBeginError( @@ -176,7 +205,12 @@ impl Transaction { if let Some(db_client) = db_client { db_client - .start_transaction(isolation_level, read_variant, deferrable) + .start_transaction( + isolation_level, + read_variant, + deferrable, + synchronous_commit, + ) .await?; Python::with_gil(|gil| { @@ -558,18 +592,26 @@ impl Transaction { /// 2) Transaction is done. /// 3) Cannot execute `BEGIN` command. pub async fn begin(self_: Py) -> RustPSQLDriverPyResult<()> { - let (is_started, is_done, isolation_level, read_variant, deferrable, db_client) = - pyo3::Python::with_gil(|gil| { - let self_ = self_.borrow(gil); - ( - self_.is_started, - self_.is_done, - self_.isolation_level, - self_.read_variant, - self_.deferrable, - self_.db_client.clone(), - ) - }); + let ( + is_started, + is_done, + isolation_level, + synchronous_commit, + read_variant, + deferrable, + db_client, + ) = pyo3::Python::with_gil(|gil| { + let self_ = self_.borrow(gil); + ( + self_.is_started, + self_.is_done, + self_.isolation_level, + self_.synchronous_commit, + self_.read_variant, + self_.deferrable, + self_.db_client.clone(), + ) + }); if let Some(db_client) = db_client { if is_started { @@ -584,7 +626,12 @@ impl Transaction { )); } db_client - .start_transaction(isolation_level, read_variant, deferrable) + .start_transaction( + isolation_level, + read_variant, + deferrable, + synchronous_commit, + ) .await?; pyo3::Python::with_gil(|gil| { diff --git a/src/driver/transaction_options.rs b/src/driver/transaction_options.rs index 822b7fe8..cdbfcb34 100644 --- a/src/driver/transaction_options.rs +++ b/src/driver/transaction_options.rs @@ -28,3 +28,46 @@ pub enum ReadVariant { ReadOnly, ReadWrite, } + +#[pyclass] +#[derive(Clone, Copy)] +pub enum SynchronousCommit { + /// As the name indicates, the commit acknowledgment can come before + /// flushing the records to disk. + /// This is generally called as an asynchronous commit. + /// If the PostgreSQL instance crashes, + /// the last few asynchronous commits might be lost. + Off, + /// WAL records are written and flushed to local disks. + /// In this case, the commit will be acknowledged after the + /// local WAL Write and WAL flush completes. + Local, + /// WAL records are successfully handed over to + /// remote instances which acknowledged back + /// about the write (not flush). + RemoteWrite, + /// The meaning may change based on whether you have + /// a synchronous standby or not. + /// If there is a synchronous standby, + /// setting the value to on will result in waiting till “remote flush”. + On, + /// This will result in commits waiting until replies from the + /// current synchronous standby(s) indicate they have received + /// the commit record of the transaction and applied it so + /// that it has become visible to queries on the standby(s). + RemoteApply, +} + +impl SynchronousCommit { + /// Return isolation level as String literal. + #[must_use] + pub fn to_str_level(&self) -> String { + match self { + SynchronousCommit::Off => "off".into(), + SynchronousCommit::Local => "local".into(), + SynchronousCommit::RemoteWrite => "remote_write".into(), + SynchronousCommit::On => "on".into(), + SynchronousCommit::RemoteApply => "remote_apply".into(), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 76a8f46b..6ead48ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ fn psqlpy(py: Python<'_>, pymod: &Bound<'_, PyModule>) -> PyResult<()> { pymod.add_class::()?; pymod.add_class::()?; pymod.add_class::()?; + pymod.add_class::()?; pymod.add_class::()?; pymod.add_class::()?; pymod.add_class::()?;