diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 641a42722..3426d624b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,16 +73,18 @@ jobs: path: target key: check-wasm32-target-${{ runner.os }}-${{ steps.rust-version.outputs.version }}-${{ hashFiles('Cargo.lock') }} - run: cargo check --target wasm32-unknown-unknown --manifest-path tokio-postgres/Cargo.toml --no-default-features --features js + env: + RUSTFLAGS: --cfg getrandom_backend="wasm_js" test: name: test runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - run: docker compose up -d - uses: sfackler/actions/rustup@master with: - version: 1.74.0 + version: 1.81.0 - run: echo "version=$(rustc --version)" >> $GITHUB_OUTPUT id: rust-version - uses: actions/cache@v3 diff --git a/docker-compose.yml b/docker-compose.yml index 0ed44148d..991df2d01 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: '2' services: postgres: - image: postgres:14 + image: docker.io/postgres:17 ports: - 5433:5433 volumes: diff --git a/postgres-derive/CHANGELOG.md b/postgres-derive/CHANGELOG.md index b0075fa8e..1532b307c 100644 --- a/postgres-derive/CHANGELOG.md +++ b/postgres-derive/CHANGELOG.md @@ -1,5 +1,11 @@ # Change Log +## v0.4.6 - 2024-09-15 + +### Changed + +* Upgraded `heck`. + ## v0.4.5 - 2023-08-19 ### Added diff --git a/postgres-derive/Cargo.toml b/postgres-derive/Cargo.toml index 1d8db7fcb..96600f124 100644 --- a/postgres-derive/Cargo.toml +++ b/postgres-derive/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "postgres-derive" -version = "0.4.5" +version = "0.4.6" authors = ["Steven Fackler "] -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" edition = "2018" description = "An internal crate used by postgres-types" repository = "https://github.com/sfackler/rust-postgres" @@ -15,4 +15,4 @@ test = false syn = "2.0" proc-macro2 = "1.0" quote = "1.0" -heck = "0.4" +heck = "0.5" diff --git a/postgres-native-tls/CHANGELOG.md b/postgres-native-tls/CHANGELOG.md index 9eb7ab800..5fe0a9c7a 100644 --- a/postgres-native-tls/CHANGELOG.md +++ b/postgres-native-tls/CHANGELOG.md @@ -1,5 +1,11 @@ # Change Log +## v0.5.1 - 2025-02-02 + +### Added + +* Added `set_postgresql_alpn`. + ## v0.5.0 - 2020-12-25 ### Changed diff --git a/postgres-native-tls/Cargo.toml b/postgres-native-tls/Cargo.toml index 1f2f6385d..f79ae5491 100644 --- a/postgres-native-tls/Cargo.toml +++ b/postgres-native-tls/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "postgres-native-tls" -version = "0.5.0" +version = "0.5.1" authors = ["Steven Fackler "] edition = "2018" -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" description = "TLS support for tokio-postgres via native-tls" repository = "https://github.com/sfackler/rust-postgres" readme = "../README.md" @@ -16,12 +16,12 @@ default = ["runtime"] runtime = ["tokio-postgres/runtime"] [dependencies] -native-tls = "0.2" +native-tls = { version = "0.2", features = ["alpn"] } tokio = "1.0" tokio-native-tls = "0.3" -tokio-postgres = { version = "0.7.0", path = "../tokio-postgres", default-features = false } +tokio-postgres = { version = "0.7.11", path = "../tokio-postgres", default-features = false } [dev-dependencies] futures-util = "0.3" tokio = { version = "1.0", features = ["macros", "net", "rt"] } -postgres = { version = "0.19.0", path = "../postgres" } +postgres = { version = "0.19.8", path = "../postgres" } diff --git a/postgres-native-tls/src/lib.rs b/postgres-native-tls/src/lib.rs index 6d36a30c9..aa750beda 100644 --- a/postgres-native-tls/src/lib.rs +++ b/postgres-native-tls/src/lib.rs @@ -53,6 +53,7 @@ //! ``` #![warn(rust_2018_idioms, clippy::all, missing_docs)] +use native_tls::TlsConnectorBuilder; use std::future::Future; use std::io; use std::pin::Pin; @@ -180,3 +181,10 @@ where } } } + +/// Set ALPN for `TlsConnectorBuilder` +/// +/// This is required when using `sslnegotiation=direct` +pub fn set_postgresql_alpn(builder: &mut TlsConnectorBuilder) { + builder.request_alpns(&["postgresql"]); +} diff --git a/postgres-native-tls/src/test.rs b/postgres-native-tls/src/test.rs index 25cc6fdbd..738c04bd7 100644 --- a/postgres-native-tls/src/test.rs +++ b/postgres-native-tls/src/test.rs @@ -5,7 +5,7 @@ use tokio_postgres::tls::TlsConnect; #[cfg(feature = "runtime")] use crate::MakeTlsConnector; -use crate::TlsConnector; +use crate::{set_postgresql_alpn, TlsConnector}; async fn smoke_test(s: &str, tls: T) where @@ -42,6 +42,21 @@ async fn require() { .await; } +#[tokio::test] +async fn direct() { + let mut builder = native_tls::TlsConnector::builder(); + builder.add_root_certificate( + Certificate::from_pem(include_bytes!("../../test/server.crt")).unwrap(), + ); + set_postgresql_alpn(&mut builder); + let connector = builder.build().unwrap(); + smoke_test( + "user=ssl_user dbname=postgres sslmode=require sslnegotiation=direct", + TlsConnector::new(connector, "localhost"), + ) + .await; +} + #[tokio::test] async fn prefer() { let connector = native_tls::TlsConnector::builder() diff --git a/postgres-openssl/CHANGELOG.md b/postgres-openssl/CHANGELOG.md index 346214ae8..33f5a127a 100644 --- a/postgres-openssl/CHANGELOG.md +++ b/postgres-openssl/CHANGELOG.md @@ -1,5 +1,11 @@ # Change Log +## v0.5.1 - 2025-02-02 + +### Added + +* Added `set_postgresql_alpn`. + ## v0.5.0 - 2020-12-25 ### Changed diff --git a/postgres-openssl/Cargo.toml b/postgres-openssl/Cargo.toml index 8671308af..6ebb86bef 100644 --- a/postgres-openssl/Cargo.toml +++ b/postgres-openssl/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "postgres-openssl" -version = "0.5.0" +version = "0.5.1" authors = ["Steven Fackler "] edition = "2018" -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" description = "TLS support for tokio-postgres via openssl" repository = "https://github.com/sfackler/rust-postgres" readme = "../README.md" @@ -19,9 +19,9 @@ runtime = ["tokio-postgres/runtime"] openssl = "0.10" tokio = "1.0" tokio-openssl = "0.6" -tokio-postgres = { version = "0.7.0", path = "../tokio-postgres", default-features = false } +tokio-postgres = { version = "0.7.11", path = "../tokio-postgres", default-features = false } [dev-dependencies] futures-util = "0.3" tokio = { version = "1.0", features = ["macros", "net", "rt"] } -postgres = { version = "0.19.0", path = "../postgres" } +postgres = { version = "0.19.8", path = "../postgres" } diff --git a/postgres-openssl/src/lib.rs b/postgres-openssl/src/lib.rs index 5f5c01473..a53a5edb1 100644 --- a/postgres-openssl/src/lib.rs +++ b/postgres-openssl/src/lib.rs @@ -53,7 +53,7 @@ use openssl::hash::MessageDigest; use openssl::nid::Nid; #[cfg(feature = "runtime")] use openssl::ssl::SslConnector; -use openssl::ssl::{self, ConnectConfiguration, SslRef}; +use openssl::ssl::{self, ConnectConfiguration, SslConnectorBuilder, SslRef}; use openssl::x509::X509VerifyResult; use std::error::Error; use std::fmt::{self, Debug}; @@ -256,3 +256,10 @@ fn tls_server_end_point(ssl: &SslRef) -> Option> { }; cert.digest(md).ok().map(|b| b.to_vec()) } + +/// Set ALPN for `SslConnectorBuilder` +/// +/// This is required when using `sslnegotiation=direct` +pub fn set_postgresql_alpn(builder: &mut SslConnectorBuilder) -> Result<(), ErrorStack> { + builder.set_alpn_protos(b"\x0apostgresql") +} diff --git a/postgres-openssl/src/test.rs b/postgres-openssl/src/test.rs index b361ee446..66bb22641 100644 --- a/postgres-openssl/src/test.rs +++ b/postgres-openssl/src/test.rs @@ -37,6 +37,19 @@ async fn require() { .await; } +#[tokio::test] +async fn direct() { + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + builder.set_ca_file("../test/server.crt").unwrap(); + set_postgresql_alpn(&mut builder).unwrap(); + let ctx = builder.build(); + smoke_test( + "user=ssl_user dbname=postgres sslmode=require sslnegotiation=direct", + TlsConnector::new(ctx.configure().unwrap(), "localhost"), + ) + .await; +} + #[tokio::test] async fn prefer() { let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); diff --git a/postgres-protocol/CHANGELOG.md b/postgres-protocol/CHANGELOG.md index 1c371675c..25e717128 100644 --- a/postgres-protocol/CHANGELOG.md +++ b/postgres-protocol/CHANGELOG.md @@ -1,6 +1,27 @@ # Change Log -## v0.6.6 -2023-08-19 +## v0.6.8 - 2025-02-02 + +### Changed + +* Upgraded `getrandom`. + +## v0.6.7 - 2024-07-21 + +### Deprecated + +* Deprecated `ErrorField::value`. + +### Added + +* Added a `Clone` implementation for `DataRowBody`. +* Added `ErrorField::value_bytes`. + +### Changed + +* Upgraded `base64`. + +## v0.6.6 - 2023-08-19 ### Added diff --git a/postgres-protocol/Cargo.toml b/postgres-protocol/Cargo.toml index bc83fc4e6..9351ea14f 100644 --- a/postgres-protocol/Cargo.toml +++ b/postgres-protocol/Cargo.toml @@ -1,16 +1,16 @@ [package] name = "postgres-protocol" -version = "0.6.6" +version = "0.6.8" authors = ["Steven Fackler "] edition = "2018" description = "Low level Postgres protocol APIs" -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" repository = "https://github.com/sfackler/rust-postgres" readme = "../README.md" [features] default = [] -js = ["getrandom/js"] +js = ["getrandom/wasm_js"] [dependencies] base64 = "0.22" @@ -20,7 +20,7 @@ fallible-iterator = "0.2" hmac = "0.12" md-5 = "0.10" memchr = "2.0" -rand = "0.8" +rand = "0.9" sha2 = "0.10" stringprep = "0.1" -getrandom = { version = "0.2", optional = true } +getrandom = { version = "0.3", optional = true } diff --git a/postgres-protocol/src/authentication/sasl.rs b/postgres-protocol/src/authentication/sasl.rs index 4a77507e9..ccd40e8d0 100644 --- a/postgres-protocol/src/authentication/sasl.rs +++ b/postgres-protocol/src/authentication/sasl.rs @@ -136,10 +136,10 @@ impl ScramSha256 { /// Constructs a new instance which will use the provided password for authentication. pub fn new(password: &[u8], channel_binding: ChannelBinding) -> ScramSha256 { // rand 0.5's ThreadRng is cryptographically secure - let mut rng = rand::thread_rng(); + let mut rng = rand::rng(); let nonce = (0..NONCE_LENGTH) .map(|_| { - let mut v = rng.gen_range(0x21u8..0x7e); + let mut v = rng.random_range(0x21u8..0x7e); if v == 0x2c { v = 0x7e } diff --git a/postgres-protocol/src/message/backend.rs b/postgres-protocol/src/message/backend.rs index 1b5be1098..013bfbb81 100644 --- a/postgres-protocol/src/message/backend.rs +++ b/postgres-protocol/src/message/backend.rs @@ -475,7 +475,7 @@ pub struct ColumnFormats<'a> { remaining: u16, } -impl<'a> FallibleIterator for ColumnFormats<'a> { +impl FallibleIterator for ColumnFormats<'_> { type Item = u16; type Error = io::Error; @@ -524,7 +524,7 @@ impl CopyOutResponseBody { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DataRowBody { storage: Bytes, len: u16, @@ -557,7 +557,7 @@ pub struct DataRowRanges<'a> { remaining: u16, } -impl<'a> FallibleIterator for DataRowRanges<'a> { +impl FallibleIterator for DataRowRanges<'_> { type Item = Option>; type Error = io::Error; @@ -633,7 +633,7 @@ impl<'a> FallibleIterator for ErrorFields<'a> { } let value_end = find_null(self.buf, 0)?; - let value = get_str(&self.buf[..value_end])?; + let value = &self.buf[..value_end]; self.buf = &self.buf[value_end + 1..]; Ok(Some(ErrorField { type_, value })) @@ -642,17 +642,23 @@ impl<'a> FallibleIterator for ErrorFields<'a> { pub struct ErrorField<'a> { type_: u8, - value: &'a str, + value: &'a [u8], } -impl<'a> ErrorField<'a> { +impl ErrorField<'_> { #[inline] pub fn type_(&self) -> u8 { self.type_ } #[inline] + #[deprecated(note = "use value_bytes instead", since = "0.6.7")] pub fn value(&self) -> &str { + str::from_utf8(self.value).expect("error field value contained non-UTF8 bytes") + } + + #[inline] + pub fn value_bytes(&self) -> &[u8] { self.value } } @@ -711,7 +717,7 @@ pub struct Parameters<'a> { remaining: u16, } -impl<'a> FallibleIterator for Parameters<'a> { +impl FallibleIterator for Parameters<'_> { type Item = Oid; type Error = io::Error; diff --git a/postgres-protocol/src/password/mod.rs b/postgres-protocol/src/password/mod.rs index f03bb811d..445fb0c0e 100644 --- a/postgres-protocol/src/password/mod.rs +++ b/postgres-protocol/src/password/mod.rs @@ -28,7 +28,7 @@ const SCRAM_DEFAULT_SALT_LEN: usize = 16; /// special characters that would require escaping in an SQL command. pub fn scram_sha_256(password: &[u8]) -> String { let mut salt: [u8; SCRAM_DEFAULT_SALT_LEN] = [0; SCRAM_DEFAULT_SALT_LEN]; - let mut rng = rand::thread_rng(); + let mut rng = rand::rng(); rng.fill_bytes(&mut salt); scram_sha_256_salt(password, salt) } diff --git a/postgres-protocol/src/types/mod.rs b/postgres-protocol/src/types/mod.rs index 69957818b..7f5fbb022 100644 --- a/postgres-protocol/src/types/mod.rs +++ b/postgres-protocol/src/types/mod.rs @@ -582,7 +582,7 @@ impl<'a> Array<'a> { /// An iterator over the dimensions of an array. pub struct ArrayDimensions<'a>(&'a [u8]); -impl<'a> FallibleIterator for ArrayDimensions<'a> { +impl FallibleIterator for ArrayDimensions<'_> { type Item = ArrayDimension; type Error = StdBox; @@ -1048,7 +1048,7 @@ pub struct PathPoints<'a> { buf: &'a [u8], } -impl<'a> FallibleIterator for PathPoints<'a> { +impl FallibleIterator for PathPoints<'_> { type Item = Point; type Error = StdBox; diff --git a/postgres-range/src/lib.rs b/postgres-range/src/lib.rs index 6215c0bb2..572191a9c 100644 --- a/postgres-range/src/lib.rs +++ b/postgres-range/src/lib.rs @@ -424,6 +424,10 @@ where } /// Determines if a value lies within this range. + #[expect( + clippy::unnecessary_map_or, + reason = "is_none_or is not available on 1.81 which is used in CI" + )] pub fn contains(&self, value: &T) -> bool { match self.inner { Empty => false, diff --git a/postgres-types/CHANGELOG.md b/postgres-types/CHANGELOG.md index 702cd5ef6..f7faadcb8 100644 --- a/postgres-types/CHANGELOG.md +++ b/postgres-types/CHANGELOG.md @@ -1,12 +1,39 @@ # Change Log -### Added +## TandemDrive * Upgraded support for `chrono-tz` to 0.10. Use the `with-chrono-tz-0_10` feature to enable it. +* Added support for `chrono-tz` 0.9 via the `with-chrono-tz-0_9` feature. + +## Unreleased + +## v0.2.9 - 2025-02-02 ### Added -* Added support for `chrono-tz` 0.9 via the `with-chrono-tz-0_9` feature. +* Added support for `cidr` 0.3 via the `with-cidr-0_3` feature. + +### Fixed + +* Fixed deserialization of out of bounds inputs to `time` 0.3 types to return an error rather than panic. + +## v0.2.8 - 2024-09-15 + +### Added + +* Added support for `jiff` 0.1 via the `with-jiff-01` feature. + +## v0.2.7 - 2024-07-21 + +### Added + +* Added `Default` implementation for `Json`. +* Added a `js` feature for WASM compatibility. + +### Changed + +* `FromStr` implementation for `PgLsn` no longer allocates a `Vec` when splitting an lsn string on it's `/`. +* The `eui48-1` feature no longer enables default features of the `eui48` library. ## v0.2.6 - 2023-08-19 diff --git a/postgres-types/Cargo.toml b/postgres-types/Cargo.toml index 13aae3d73..363cbcbab 100644 --- a/postgres-types/Cargo.toml +++ b/postgres-types/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "postgres-types" -version = "0.2.6" +version = "0.2.9" authors = ["Steven Fackler "] edition = "2018" -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" description = "Conversions between Rust and Postgres values" repository = "https://github.com/sfackler/rust-postgres" readme = "../README.md" @@ -13,14 +13,18 @@ categories = ["database"] [features] derive = ["postgres-derive"] array-impls = ["array-init"] +js = ["postgres-protocol/js"] with-bit-vec-0_6 = ["bit-vec-06"] with-cidr-0_2 = ["cidr-02"] +with-cidr-0_3 = ["cidr-03"] with-chrono-0_4 = ["chrono-04"] with-chrono-tz-0_10 = ["chrono-tz-010"] with-eui48-0_4 = ["eui48-04"] with-eui48-1 = ["eui48-1"] with-geo-types-0_6 = ["geo-types-06"] with-geo-types-0_7 = ["geo-types-0_7"] +with-jiff-0_1 = ["jiff-01"] +with-jiff-0_2 = ["jiff-02"] with-serde_json-1 = ["serde-1", "serde_json-1"] with-smol_str-01 = ["smol_str-01"] with-uuid-0_8 = ["uuid-08"] @@ -31,20 +35,23 @@ with-time-0_3 = ["time-03"] [dependencies] bytes = "1.0" fallible-iterator = "0.2" -postgres-protocol = { version = "0.6.5", path = "../postgres-protocol" } -postgres-derive = { version = "0.4.5", optional = true, path = "../postgres-derive" } +postgres-protocol = { version = "0.6.8", path = "../postgres-protocol" } +postgres-derive = { version = "0.4.6", optional = true, path = "../postgres-derive" } chrono-tz-010 = { version = "0.10", package = "chrono-tz", optional = true } array-init = { version = "2", optional = true } bit-vec-06 = { version = "0.6", package = "bit-vec", optional = true } chrono-04 = { version = "0.4.16", package = "chrono", default-features = false, features = ["clock"], optional = true } cidr-02 = { version = "0.2", package = "cidr", optional = true } +cidr-03 = { version = "0.3", package = "cidr", optional = true } # eui48-04 will stop compiling and support will be removed # See https://github.com/sfackler/rust-postgres/issues/1073 eui48-04 = { version = "0.4", package = "eui48", optional = true } eui48-1 = { version = "1.0", package = "eui48", optional = true, default-features = false } geo-types-06 = { version = "0.6", package = "geo-types", optional = true } geo-types-0_7 = { version = "0.7", package = "geo-types", optional = true } +jiff-01 = { version = "0.1", package = "jiff", optional = true } +jiff-02 = { version = "0.2", package = "jiff", optional = true } serde-1 = { version = "1.0", package = "serde", optional = true } serde_json-1 = { version = "1.0", package = "serde_json", optional = true, features = ["raw_value"]} uuid-08 = { version = "0.8", package = "uuid", optional = true } diff --git a/postgres-types/src/chrono_04.rs b/postgres-types/src/chrono_04.rs index b6f21ff42..4b4708bca 100644 --- a/postgres-types/src/chrono_04.rs +++ b/postgres-types/src/chrono_04.rs @@ -14,7 +14,7 @@ fn base() -> NaiveDateTime { .unwrap() } -impl<'a> FromSql<'a> for NaiveDateTime { +impl FromSql<'_> for NaiveDateTime { fn from_sql(_: &Type, raw: &[u8]) -> Result> { let t = types::timestamp_from_sql(raw)?; base() @@ -39,7 +39,7 @@ impl ToSql for NaiveDateTime { to_sql_checked!(); } -impl<'a> FromSql<'a> for DateTime { +impl FromSql<'_> for DateTime { fn from_sql(type_: &Type, raw: &[u8]) -> Result, Box> { let naive = NaiveDateTime::from_sql(type_, raw)?; Ok(Utc.from_utc_datetime(&naive)) @@ -61,7 +61,7 @@ impl ToSql for DateTime { to_sql_checked!(); } -impl<'a> FromSql<'a> for DateTime { +impl FromSql<'_> for DateTime { fn from_sql(type_: &Type, raw: &[u8]) -> Result, Box> { let utc = DateTime::::from_sql(type_, raw)?; Ok(utc.with_timezone(&Local)) @@ -83,7 +83,7 @@ impl ToSql for DateTime { to_sql_checked!(); } -impl<'a> FromSql<'a> for DateTime { +impl FromSql<'_> for DateTime { fn from_sql( type_: &Type, raw: &[u8], @@ -108,7 +108,7 @@ impl ToSql for DateTime { to_sql_checked!(); } -impl<'a> FromSql<'a> for NaiveDate { +impl FromSql<'_> for NaiveDate { fn from_sql(_: &Type, raw: &[u8]) -> Result> { let jd = types::date_from_sql(raw)?; base() @@ -135,7 +135,7 @@ impl ToSql for NaiveDate { to_sql_checked!(); } -impl<'a> FromSql<'a> for NaiveTime { +impl FromSql<'_> for NaiveTime { fn from_sql(_: &Type, raw: &[u8]) -> Result> { let usec = types::time_from_sql(raw)?; Ok(NaiveTime::from_hms_opt(0, 0, 0).unwrap() + Duration::microseconds(usec)) diff --git a/postgres-types/src/cidr_03.rs b/postgres-types/src/cidr_03.rs new file mode 100644 index 000000000..6a0178711 --- /dev/null +++ b/postgres-types/src/cidr_03.rs @@ -0,0 +1,44 @@ +use bytes::BytesMut; +use cidr_03::{IpCidr, IpInet}; +use postgres_protocol::types; +use std::error::Error; + +use crate::{FromSql, IsNull, ToSql, Type}; + +impl<'a> FromSql<'a> for IpCidr { + fn from_sql(_: &Type, raw: &[u8]) -> Result> { + let inet = types::inet_from_sql(raw)?; + Ok(IpCidr::new(inet.addr(), inet.netmask())?) + } + + accepts!(CIDR); +} + +impl ToSql for IpCidr { + fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { + types::inet_to_sql(self.first_address(), self.network_length(), w); + Ok(IsNull::No) + } + + accepts!(CIDR); + to_sql_checked!(); +} + +impl<'a> FromSql<'a> for IpInet { + fn from_sql(_: &Type, raw: &[u8]) -> Result> { + let inet = types::inet_from_sql(raw)?; + Ok(IpInet::new(inet.addr(), inet.netmask())?) + } + + accepts!(INET); +} + +impl ToSql for IpInet { + fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { + types::inet_to_sql(self.address(), self.network_length(), w); + Ok(IsNull::No) + } + + accepts!(INET); + to_sql_checked!(); +} diff --git a/postgres-types/src/jiff_01.rs b/postgres-types/src/jiff_01.rs new file mode 100644 index 000000000..d3215c0e6 --- /dev/null +++ b/postgres-types/src/jiff_01.rs @@ -0,0 +1,141 @@ +use bytes::BytesMut; +use jiff_01::{ + civil::{Date, DateTime, Time}, + Span, SpanRound, Timestamp, Unit, +}; +use postgres_protocol::types; +use std::error::Error; + +use crate::{FromSql, IsNull, ToSql, Type}; + +const fn base() -> DateTime { + DateTime::constant(2000, 1, 1, 0, 0, 0, 0) +} + +/// The number of seconds from the Unix epoch to 2000-01-01 00:00:00 UTC. +const PG_EPOCH: i64 = 946684800; + +fn base_ts() -> Timestamp { + Timestamp::new(PG_EPOCH, 0).unwrap() +} + +fn round_us<'a>() -> SpanRound<'a> { + SpanRound::new().largest(Unit::Microsecond) +} + +fn decode_err(_e: E) -> Box +where + E: Error, +{ + "value too large to decode".into() +} + +fn transmit_err(_e: E) -> Box +where + E: Error, +{ + "value too large to transmit".into() +} + +impl<'a> FromSql<'a> for DateTime { + fn from_sql(_: &Type, raw: &[u8]) -> Result> { + let v = types::timestamp_from_sql(raw)?; + Span::new() + .try_microseconds(v) + .and_then(|s| base().checked_add(s)) + .map_err(decode_err) + } + + accepts!(TIMESTAMP); +} + +impl ToSql for DateTime { + fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { + let v = self + .since(base()) + .and_then(|s| s.round(round_us())) + .map_err(transmit_err)? + .get_microseconds(); + types::timestamp_to_sql(v, w); + Ok(IsNull::No) + } + + accepts!(TIMESTAMP); + to_sql_checked!(); +} + +impl<'a> FromSql<'a> for Timestamp { + fn from_sql(_: &Type, raw: &[u8]) -> Result> { + let v = types::timestamp_from_sql(raw)?; + Span::new() + .try_microseconds(v) + .and_then(|s| base_ts().checked_add(s)) + .map_err(decode_err) + } + + accepts!(TIMESTAMPTZ); +} + +impl ToSql for Timestamp { + fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { + let v = self + .since(base_ts()) + .and_then(|s| s.round(round_us())) + .map_err(transmit_err)? + .get_microseconds(); + types::timestamp_to_sql(v, w); + Ok(IsNull::No) + } + + accepts!(TIMESTAMPTZ); + to_sql_checked!(); +} + +impl<'a> FromSql<'a> for Date { + fn from_sql(_: &Type, raw: &[u8]) -> Result> { + let v = types::date_from_sql(raw)?; + Span::new() + .try_days(v) + .and_then(|s| base().date().checked_add(s)) + .map_err(decode_err) + } + accepts!(DATE); +} + +impl ToSql for Date { + fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { + let v = self.since(base().date()).map_err(transmit_err)?.get_days(); + types::date_to_sql(v, w); + Ok(IsNull::No) + } + + accepts!(DATE); + to_sql_checked!(); +} + +impl<'a> FromSql<'a> for Time { + fn from_sql(_: &Type, raw: &[u8]) -> Result> { + let v = types::time_from_sql(raw)?; + Span::new() + .try_microseconds(v) + .and_then(|s| Time::midnight().checked_add(s)) + .map_err(decode_err) + } + + accepts!(TIME); +} + +impl ToSql for Time { + fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { + let v = self + .since(Time::midnight()) + .and_then(|s| s.round(round_us())) + .map_err(transmit_err)? + .get_microseconds(); + types::time_to_sql(v, w); + Ok(IsNull::No) + } + + accepts!(TIME); + to_sql_checked!(); +} diff --git a/postgres-types/src/jiff_02.rs b/postgres-types/src/jiff_02.rs new file mode 100644 index 000000000..1db8e7239 --- /dev/null +++ b/postgres-types/src/jiff_02.rs @@ -0,0 +1,139 @@ +use bytes::BytesMut; +use jiff_02::{ + civil::{Date, DateTime, Time}, + Span, SpanRound, Timestamp, Unit, +}; +use postgres_protocol::types; +use std::error::Error; + +use crate::{FromSql, IsNull, ToSql, Type}; + +const fn base() -> DateTime { + DateTime::constant(2000, 1, 1, 0, 0, 0, 0) +} + +/// The number of seconds from the Unix epoch to 2000-01-01 00:00:00 UTC. +const PG_EPOCH: i64 = 946684800; + +fn base_ts() -> Timestamp { + Timestamp::new(PG_EPOCH, 0).unwrap() +} + +fn round_us<'a>() -> SpanRound<'a> { + SpanRound::new().largest(Unit::Microsecond) +} + +fn decode_err(_e: E) -> Box +where + E: Error, +{ + "value too large to decode".into() +} + +fn transmit_err(_e: E) -> Box +where + E: Error, +{ + "value too large to transmit".into() +} + +impl<'a> FromSql<'a> for DateTime { + fn from_sql(_: &Type, raw: &[u8]) -> Result> { + let v = types::timestamp_from_sql(raw)?; + Span::new() + .try_microseconds(v) + .and_then(|s| base().checked_add(s)) + .map_err(decode_err) + } + + accepts!(TIMESTAMP); +} + +impl ToSql for DateTime { + fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { + let v = dbg!(dbg!(self.since(base())).and_then(|s| s.round(round_us().relative(base())))) + .map_err(transmit_err)? + .get_microseconds(); + types::timestamp_to_sql(v, w); + Ok(IsNull::No) + } + + accepts!(TIMESTAMP); + to_sql_checked!(); +} + +impl<'a> FromSql<'a> for Timestamp { + fn from_sql(_: &Type, raw: &[u8]) -> Result> { + let v = types::timestamp_from_sql(raw)?; + Span::new() + .try_microseconds(v) + .and_then(|s| base_ts().checked_add(s)) + .map_err(decode_err) + } + + accepts!(TIMESTAMPTZ); +} + +impl ToSql for Timestamp { + fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { + let v = self + .since(base_ts()) + .and_then(|s| s.round(round_us())) + .map_err(transmit_err)? + .get_microseconds(); + types::timestamp_to_sql(v, w); + Ok(IsNull::No) + } + + accepts!(TIMESTAMPTZ); + to_sql_checked!(); +} + +impl<'a> FromSql<'a> for Date { + fn from_sql(_: &Type, raw: &[u8]) -> Result> { + let v = types::date_from_sql(raw)?; + Span::new() + .try_days(v) + .and_then(|s| base().date().checked_add(s)) + .map_err(decode_err) + } + accepts!(DATE); +} + +impl ToSql for Date { + fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { + let v = self.since(base().date()).map_err(transmit_err)?.get_days(); + types::date_to_sql(v, w); + Ok(IsNull::No) + } + + accepts!(DATE); + to_sql_checked!(); +} + +impl<'a> FromSql<'a> for Time { + fn from_sql(_: &Type, raw: &[u8]) -> Result> { + let v = types::time_from_sql(raw)?; + Span::new() + .try_microseconds(v) + .and_then(|s| Time::midnight().checked_add(s)) + .map_err(decode_err) + } + + accepts!(TIME); +} + +impl ToSql for Time { + fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { + let v = self + .since(Time::midnight()) + .and_then(|s| s.round(round_us())) + .map_err(transmit_err)? + .get_microseconds(); + types::time_to_sql(v, w); + Ok(IsNull::No) + } + + accepts!(TIME); + to_sql_checked!(); +} diff --git a/postgres-types/src/lib.rs b/postgres-types/src/lib.rs index 6973eb7d8..913cf15c2 100644 --- a/postgres-types/src/lib.rs +++ b/postgres-types/src/lib.rs @@ -270,6 +270,8 @@ mod chrono_04; mod chrono_tz_010; #[cfg(feature = "with-cidr-0_2")] mod cidr_02; +#[cfg(feature = "with-cidr-0_3")] +mod cidr_03; #[cfg(feature = "with-eui48-0_4")] mod eui48_04; #[cfg(feature = "with-eui48-1")] @@ -278,6 +280,10 @@ mod eui48_1; mod geo_types_06; #[cfg(feature = "with-geo-types-0_7")] mod geo_types_07; +#[cfg(feature = "with-jiff-0_1")] +mod jiff_01; +#[cfg(feature = "with-jiff-0_2")] +mod jiff_02; #[cfg(feature = "with-serde_json-1")] mod serde_json_1; #[cfg(feature = "with-smol_str-01")] @@ -489,10 +495,16 @@ impl WrongType { /// | `chrono::DateTime` | TIMESTAMP WITH TIME ZONE | /// | `chrono::NaiveDate` | DATE | /// | `chrono::NaiveTime` | TIME | +/// | `cidr::IpCidr` | CIDR | +/// | `cidr::IpInet` | INET | /// | `time::PrimitiveDateTime` | TIMESTAMP | /// | `time::OffsetDateTime` | TIMESTAMP WITH TIME ZONE | /// | `time::Date` | DATE | /// | `time::Time` | TIME | +/// | `jiff::civil::Date` | DATE | +/// | `jiff::civil::DateTime` | TIMESTAMP | +/// | `jiff::civil::Time` | TIME | +/// | `jiff::Timestamp` | TIMESTAMP WITH TIME ZONE | /// | `eui48::MacAddress` | MACADDR | /// | `geo_types::Point` | POINT | /// | `geo_types::Rect` | BOX | @@ -899,6 +911,8 @@ pub enum IsNull { /// | `chrono::DateTime` | TIMESTAMP WITH TIME ZONE | /// | `chrono::NaiveDate` | DATE | /// | `chrono::NaiveTime` | TIME | +/// | `cidr::IpCidr` | CIDR | +/// | `cidr::IpInet` | INET | /// | `time::PrimitiveDateTime` | TIMESTAMP | /// | `time::OffsetDateTime` | TIMESTAMP WITH TIME ZONE | /// | `time::Date` | DATE | @@ -973,7 +987,7 @@ pub enum Format { Binary, } -impl<'a, T> ToSql for &'a T +impl ToSql for &T where T: ToSql, { @@ -1014,7 +1028,7 @@ impl ToSql for Option { fn encode_format(&self, ty: &Type) -> Format { match self { - Some(ref val) => val.encode_format(ty), + Some(val) => val.encode_format(ty), None => Format::Binary, } } @@ -1022,7 +1036,7 @@ impl ToSql for Option { to_sql_checked!(); } -impl<'a, T: ToSql> ToSql for &'a [T] { +impl ToSql for &[T] { fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> Result> { let member_type = match *ty.kind() { Kind::Array(ref member) => member, @@ -1063,7 +1077,7 @@ impl<'a, T: ToSql> ToSql for &'a [T] { to_sql_checked!(); } -impl<'a> ToSql for &'a [u8] { +impl ToSql for &[u8] { fn to_sql(&self, _: &Type, w: &mut BytesMut) -> Result> { types::bytea_to_sql(self, w); Ok(IsNull::No) @@ -1123,7 +1137,7 @@ impl ToSql for Box<[T]> { to_sql_checked!(); } -impl<'a> ToSql for Cow<'a, [u8]> { +impl ToSql for Cow<'_, [u8]> { fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> Result> { <&[u8] as ToSql>::to_sql(&self.as_ref(), ty, w) } @@ -1147,7 +1161,7 @@ impl ToSql for Vec { to_sql_checked!(); } -impl<'a> ToSql for &'a str { +impl ToSql for &str { fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> Result> { match ty.name() { "ltree" => types::ltree_to_sql(self, w), @@ -1168,7 +1182,7 @@ impl<'a> ToSql for &'a str { to_sql_checked!(); } -impl<'a> ToSql for Cow<'a, str> { +impl ToSql for Cow<'_, str> { fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> Result> { <&str as ToSql>::to_sql(&self.as_ref(), ty, w) } @@ -1315,17 +1329,17 @@ impl BorrowToSql for &dyn ToSql { } } -impl<'a> sealed::Sealed for Box {} +impl sealed::Sealed for Box {} -impl<'a> BorrowToSql for Box { +impl BorrowToSql for Box { #[inline] fn borrow_to_sql(&self) -> &dyn ToSql { self.as_ref() } } -impl<'a> sealed::Sealed for Box {} -impl<'a> BorrowToSql for Box { +impl sealed::Sealed for Box {} +impl BorrowToSql for Box { #[inline] fn borrow_to_sql(&self) -> &dyn ToSql { self.as_ref() diff --git a/postgres-types/src/pg_lsn.rs b/postgres-types/src/pg_lsn.rs index f0bbf4022..f339f9689 100644 --- a/postgres-types/src/pg_lsn.rs +++ b/postgres-types/src/pg_lsn.rs @@ -33,16 +33,14 @@ impl FromStr for PgLsn { type Err = ParseLsnError; fn from_str(lsn_str: &str) -> Result { - let split: Vec<&str> = lsn_str.split('/').collect(); - if split.len() == 2 { - let (hi, lo) = ( - u64::from_str_radix(split[0], 16).map_err(|_| ParseLsnError(()))?, - u64::from_str_radix(split[1], 16).map_err(|_| ParseLsnError(()))?, - ); - Ok(PgLsn((hi << 32) | lo)) - } else { - Err(ParseLsnError(())) - } + let Some((split_hi, split_lo)) = lsn_str.split_once('/') else { + return Err(ParseLsnError(())); + }; + let (hi, lo) = ( + u64::from_str_radix(split_hi, 16).map_err(|_| ParseLsnError(()))?, + u64::from_str_radix(split_lo, 16).map_err(|_| ParseLsnError(()))?, + ); + Ok(PgLsn((hi << 32) | lo)) } } diff --git a/postgres-types/src/time_03.rs b/postgres-types/src/time_03.rs index adca8e3ab..2736a2d6f 100644 --- a/postgres-types/src/time_03.rs +++ b/postgres-types/src/time_03.rs @@ -13,7 +13,9 @@ fn base() -> PrimitiveDateTime { impl<'a> FromSql<'a> for PrimitiveDateTime { fn from_sql(_: &Type, raw: &[u8]) -> Result> { let t = types::timestamp_from_sql(raw)?; - Ok(base() + Duration::microseconds(t)) + Ok(base() + .checked_add(Duration::microseconds(t)) + .ok_or("value too large to decode")?) } accepts!(TIMESTAMP); @@ -62,7 +64,10 @@ impl ToSql for OffsetDateTime { impl<'a> FromSql<'a> for Date { fn from_sql(_: &Type, raw: &[u8]) -> Result> { let jd = types::date_from_sql(raw)?; - Ok(base().date() + Duration::days(i64::from(jd))) + Ok(base() + .date() + .checked_add(Duration::days(i64::from(jd))) + .ok_or("value too large to decode")?) } accepts!(DATE); diff --git a/postgres/CHANGELOG.md b/postgres/CHANGELOG.md index 7f856b5ac..771e2e779 100644 --- a/postgres/CHANGELOG.md +++ b/postgres/CHANGELOG.md @@ -1,5 +1,26 @@ # Change Log +## Unreleased + +## v0.19.10 - 2025-02-02 + +### Added + +* Added support for direct TLS negotiation. +* Added support for `cidr` 0.3 via the `with-cidr-0_3` feature. + +## v0.19.9 - 2024-09-15 + +### Added + +* Added support for `jiff` 0.1 via the `with-jiff-01` feature. + +## v0.19.8 - 2024-07-21 + +### Added + +* Added `{Client, Transaction, GenericClient}::query_typed`. + ## v0.19.7 - 2023-08-25 ## Fixed diff --git a/postgres/Cargo.toml b/postgres/Cargo.toml index 0c9b81b27..e353f8fca 100644 --- a/postgres/Cargo.toml +++ b/postgres/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "postgres" -version = "0.19.7" +version = "0.19.10" authors = ["Steven Fackler "] edition = "2018" -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" description = "A native, synchronous PostgreSQL client" repository = "https://github.com/sfackler/rust-postgres" readme = "../README.md" @@ -27,10 +27,14 @@ tracing-error = ["tokio-postgres/tracing-error"] array-impls = ["tokio-postgres/array-impls"] with-bit-vec-0_6 = ["tokio-postgres/with-bit-vec-0_6"] with-chrono-0_4 = ["tokio-postgres/with-chrono-0_4"] +with-cidr-0_2 = ["tokio-postgres/with-cidr-0_2"] +with-cidr-0_3 = ["tokio-postgres/with-cidr-0_3"] with-eui48-0_4 = ["tokio-postgres/with-eui48-0_4"] with-eui48-1 = ["tokio-postgres/with-eui48-1"] with-geo-types-0_6 = ["tokio-postgres/with-geo-types-0_6"] with-geo-types-0_7 = ["tokio-postgres/with-geo-types-0_7"] +with-jiff-0_1 = ["tokio-postgres/with-jiff-0_1"] +with-jiff-0_2 = ["tokio-postgres/with-jiff-0_2"] with-serde_json-1 = ["tokio-postgres/with-serde_json-1"] with-smol_str-01 = ["tokio-postgres/with-smol_str-01"] with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"] @@ -43,7 +47,7 @@ bytes = "1.0" fallible-iterator = "0.2" futures-util = { version = "0.3.14", features = ["sink"] } tracing = "0.1" -tokio-postgres = { version = "0.7.10", path = "../tokio-postgres" } +tokio-postgres = { version = "0.7.13", path = "../tokio-postgres" } tokio = { version = "1.0", features = ["rt", "time"] } [dev-dependencies] diff --git a/postgres/src/client.rs b/postgres/src/client.rs index 2e4b9a978..b678a7fce 100644 --- a/postgres/src/client.rs +++ b/postgres/src/client.rs @@ -270,6 +270,71 @@ impl Client { Ok(RowIter::new(self.connection.as_ref(), stream)) } + /// Like `query`, but requires the types of query parameters to be explicitly specified. + /// + /// Compared to `query`, this method allows performing queries without three round trips (for + /// prepare, execute, and close) by requiring the caller to specify parameter values along with + /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't + /// supported (such as Cloudflare Workers with Hyperdrive). + /// + /// A statement may contain parameters, specified by `$n`, where `n` is the index of the + /// parameter of the list provided, 1-indexed. + pub fn query_typed( + &mut self, + query: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> Result, Error> { + self.connection + .block_on(self.client.query_typed(query, params)) + } + + /// The maximally flexible version of [`query_typed`]. + /// + /// Compared to `query`, this method allows performing queries without three round trips (for + /// prepare, execute, and close) by requiring the caller to specify parameter values along with + /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't + /// supported (such as Cloudflare Workers with Hyperdrive). + /// + /// A statement may contain parameters, specified by `$n`, where `n` is the index of the + /// parameter of the list provided, 1-indexed. + /// + /// [`query_typed`]: #method.query_typed + /// + /// # Examples + /// ```no_run + /// # use postgres::{Client, NoTls}; + /// use postgres::types::{ToSql, Type}; + /// use fallible_iterator::FallibleIterator; + /// # fn main() -> Result<(), postgres::Error> { + /// # let mut client = Client::connect("host=localhost user=postgres", NoTls)?; + /// + /// let params: Vec<(String, Type)> = vec![ + /// ("first param".into(), Type::TEXT), + /// ("second param".into(), Type::TEXT), + /// ]; + /// let mut it = client.query_typed_raw( + /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2", + /// params, + /// )?; + /// + /// while let Some(row) = it.next()? { + /// let foo: i32 = row.get("foo"); + /// println!("foo: {}", foo); + /// } + /// # Ok(()) + /// # } + /// ``` + pub fn query_typed_raw(&mut self, query: &str, params: I) -> Result, Error> + where + P: BorrowToSql, + I: IntoIterator, + { + let stream = self + .connection + .block_on(self.client.query_typed_raw(query, params))?; + Ok(RowIter::new(self.connection.as_ref(), stream)) + } + /// Creates a new prepared statement. /// /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc), diff --git a/postgres/src/config.rs b/postgres/src/config.rs index a7777ffe1..bf842ea56 100644 --- a/postgres/src/config.rs +++ b/postgres/src/config.rs @@ -11,7 +11,7 @@ use std::time::Duration; use tokio::runtime; #[doc(inline)] pub use tokio_postgres::config::{ - ChannelBinding, Host, LoadBalanceHosts, SslMode, TargetSessionAttrs, + ChannelBinding, Host, LoadBalanceHosts, SslMode, SslNegotiation, TargetSessionAttrs, }; use tokio_postgres::error::DbError; use tokio_postgres::tls::{MakeTlsConnect, TlsConnect}; @@ -39,6 +39,9 @@ use tokio_postgres::{Error, Socket}; /// path to the directory containing Unix domain sockets. Otherwise, it is treated as a hostname. Multiple hosts /// can be specified, separated by commas. Each host will be tried in turn when connecting. Required if connecting /// with the `connect` method. +/// * `sslnegotiation` - TLS negotiation method. If set to `direct`, the client will perform direct TLS handshake, this only works for PostgreSQL 17 and newer. +/// Note that you will need to setup ALPN of TLS client configuration to `postgresql` when using direct TLS. +/// If set to `postgres`, the default value, it follows original postgres wire protocol to perform the negotiation. /// * `hostaddr` - Numeric IP address of host to connect to. This should be in the standard IPv4 address format, /// e.g., 172.28.40.9. If your machine supports IPv6, you can also use those addresses. /// If this parameter is not specified, the value of `host` will be looked up to find the corresponding IP address, @@ -229,6 +232,17 @@ impl Config { self.config.get_ssl_mode() } + /// Sets the SSL negotiation method + pub fn ssl_negotiation(&mut self, ssl_negotiation: SslNegotiation) -> &mut Config { + self.config.ssl_negotiation(ssl_negotiation); + self + } + + /// Gets the SSL negotiation method + pub fn get_ssl_negotiation(&self) -> SslNegotiation { + self.config.get_ssl_negotiation() + } + /// Adds a host to the configuration. /// /// Multiple hosts can be specified by calling this method multiple times, and each will be tried in order. On Unix diff --git a/postgres/src/generic_client.rs b/postgres/src/generic_client.rs index 45bd69332..da027fdef 100644 --- a/postgres/src/generic_client.rs +++ b/postgres/src/generic_client.rs @@ -46,6 +46,19 @@ pub trait GenericClient: private::Sealed { I: IntoIterator, I::IntoIter: ExactSizeIterator; + /// Like [`Client::query_typed`] + fn query_typed( + &mut self, + statement: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> Result, Error>; + + /// Like [`Client::query_typed_raw`] + fn query_typed_raw(&mut self, statement: &str, params: I) -> Result, Error> + where + P: BorrowToSql, + I: IntoIterator + Sync + Send; + /// Like `Client::prepare`. fn prepare(&mut self, query: &str) -> Result; @@ -117,6 +130,22 @@ impl GenericClient for Client { self.query_raw(query, params) } + fn query_typed( + &mut self, + statement: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> Result, Error> { + self.query_typed(statement, params) + } + + fn query_typed_raw(&mut self, statement: &str, params: I) -> Result, Error> + where + P: BorrowToSql, + I: IntoIterator + Sync + Send, + { + self.query_typed_raw(statement, params) + } + fn prepare(&mut self, query: &str) -> Result { self.prepare(query) } @@ -197,6 +226,22 @@ impl GenericClient for Transaction<'_> { self.query_raw(query, params) } + fn query_typed( + &mut self, + statement: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> Result, Error> { + self.query_typed(statement, params) + } + + fn query_typed_raw(&mut self, statement: &str, params: I) -> Result, Error> + where + P: BorrowToSql, + I: IntoIterator + Sync + Send, + { + self.query_typed_raw(statement, params) + } + fn prepare(&mut self, query: &str) -> Result { self.prepare(query) } diff --git a/postgres/src/notifications.rs b/postgres/src/notifications.rs index c31d4f631..0c040dedf 100644 --- a/postgres/src/notifications.rs +++ b/postgres/src/notifications.rs @@ -77,7 +77,7 @@ pub struct Iter<'a> { connection: ConnectionRef<'a>, } -impl<'a> FallibleIterator for Iter<'a> { +impl FallibleIterator for Iter<'_> { type Item = Notification; type Error = Error; @@ -100,7 +100,7 @@ pub struct BlockingIter<'a> { connection: ConnectionRef<'a>, } -impl<'a> FallibleIterator for BlockingIter<'a> { +impl FallibleIterator for BlockingIter<'_> { type Item = Notification; type Error = Error; @@ -129,7 +129,7 @@ pub struct TimeoutIter<'a> { timeout: Duration, } -impl<'a> FallibleIterator for TimeoutIter<'a> { +impl FallibleIterator for TimeoutIter<'_> { type Item = Notification; type Error = Error; diff --git a/postgres/src/transaction.rs b/postgres/src/transaction.rs index ed96c5b6e..f2d15be21 100644 --- a/postgres/src/transaction.rs +++ b/postgres/src/transaction.rs @@ -14,13 +14,13 @@ pub struct Transaction<'a> { transaction: Option>, } -impl<'a> fmt::Debug for Transaction<'a> { +impl fmt::Debug for Transaction<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&self.transaction, f) } } -impl<'a> Drop for Transaction<'a> { +impl Drop for Transaction<'_> { fn drop(&mut self) { if let Some(transaction) = self.transaction.take() { let _ = self.connection.block_on(transaction.rollback()); @@ -132,6 +132,35 @@ impl<'a> Transaction<'a> { Ok(RowIter::new(self.connection.as_ref(), stream)) } + /// Like `Client::query_typed`. + pub fn query_typed( + &mut self, + statement: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> Result, Error> { + self.connection.block_on( + self.transaction + .as_ref() + .unwrap() + .query_typed(statement, params), + ) + } + + /// Like `Client::query_typed_raw`. + pub fn query_typed_raw(&mut self, query: &str, params: I) -> Result, Error> + where + P: BorrowToSql, + I: IntoIterator, + { + let stream = self.connection.block_on( + self.transaction + .as_ref() + .unwrap() + .query_typed_raw(query, params), + )?; + Ok(RowIter::new(self.connection.as_ref(), stream)) + } + /// Binds parameters to a statement, creating a "portal". /// /// Portals can be used with the `query_portal` method to page through the results of a query without being forced diff --git a/tokio-postgres/CHANGELOG.md b/tokio-postgres/CHANGELOG.md index 486ada8c1..5c2c1180c 100644 --- a/tokio-postgres/CHANGELOG.md +++ b/tokio-postgres/CHANGELOG.md @@ -1,14 +1,60 @@ # Change Log +## TandemDrive + +* Added support for `chrono-tz` 0.9 via the `chrono-tz-0_9` feature. +* Pretty print error location. + ## Unreleased -* Disable `rustc-serialize` compatibility of `eui48-1` dependency -* Remove tests for `eui48-04` -* Add `table_oid` and `field_id` fields to `Columns` struct of prepared statements. +## v0.7.13 - 2025-02-02 -## Added +### Added -* Added support for `chrono-tz` 0.9 via the `chrono-tz-0_9` feature. +* Added support for direct TLS negotiation. +* Added support for `cidr` 0.3 via the `with-cidr-0_3` feature. + +### Fixes + +* Added `load_balance_hosts` to `Config`'s `Debug` implementation. + +### Changes + +* Upgraded `rand`. + +## v0.7.12 - 2024-09-15 + +### Fixed + +* Fixed `query_typed` queries that return no rows. + +### Added + +* Added support for `jiff` 0.1 via the `with-jiff-01` feature. +* Added support for TCP keepalive on AIX. + +## v0.7.11 - 2024-07-21 + +### Fixed + +* Fixed handling of non-UTF8 error fields which can be sent after failed handshakes. +* Fixed cancellation handling of `TransactionBuilder::start` futures. + +### Added + +* Added `table_oid` and `field_id` fields to `Columns` struct of prepared statements. +* Added `GenericClient::simple_query`. +* Added `#[track_caller]` to `Row::get` and `SimpleQueryRow::get`. +* Added `TargetSessionAttrs::ReadOnly`. +* Added `Debug` implementation for `Statement`. +* Added `Clone` implementation for `Row`. +* Added `SimpleQueryMessage::RowDescription`. +* Added `{Client, Transaction, GenericClient}::query_typed`. + +### Changed + +* Disable `rustc-serialize` compatibility of `eui48-1` dependency +* Config setters now take `impl Into`. ## v0.7.10 - 2023-08-25 diff --git a/tokio-postgres/Cargo.toml b/tokio-postgres/Cargo.toml index 5f63a7ed1..b33cf0c76 100644 --- a/tokio-postgres/Cargo.toml +++ b/tokio-postgres/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "tokio-postgres" -version = "0.7.10" +version = "0.7.13" authors = ["Steven Fackler "] edition = "2018" -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" description = "A native, asynchronous PostgreSQL client" repository = "https://github.com/sfackler/rust-postgres" readme = "../README.md" @@ -36,17 +36,21 @@ array-impls = ["postgres-types/array-impls"] with-bit-vec-0_6 = ["postgres-types/with-bit-vec-0_6"] with-chrono-0_4 = ["postgres-types/with-chrono-0_4"] with-chrono-tz-0_10 = ["postgres-types/with-chrono-tz-0_10"] +with-cidr-0_2 = ["postgres-types/with-cidr-0_2"] +with-cidr-0_3 = ["postgres-types/with-cidr-0_3"] with-eui48-0_4 = ["postgres-types/with-eui48-0_4"] with-eui48-1 = ["postgres-types/with-eui48-1"] with-geo-types-0_6 = ["postgres-types/with-geo-types-0_6"] with-geo-types-0_7 = ["postgres-types/with-geo-types-0_7"] +with-jiff-0_1 = ["postgres-types/with-jiff-0_1"] +with-jiff-0_2 = ["postgres-types/with-jiff-0_2"] with-serde_json-1 = ["postgres-types/with-serde_json-1"] with-smol_str-01 = ["postgres-types/with-smol_str-01"] with-uuid-0_8 = ["postgres-types/with-uuid-0_8"] with-uuid-1 = ["postgres-types/with-uuid-1"] with-time-0_2 = ["postgres-types/with-time-0_2"] with-time-0_3 = ["postgres-types/with-time-0_3"] -js = ["postgres-protocol/js"] +js = ["postgres-protocol/js", "postgres-types/js"] [dependencies] async-trait = "0.1" @@ -60,14 +64,14 @@ parking_lot = "0.12" percent-encoding = "2.0" pin-project-lite = "0.2" phf = "0.11" -postgres-protocol = { version = "0.6.6", path = "../postgres-protocol" } -postgres-types = { version = "0.2.5", path = "../postgres-types" } +postgres-protocol = { version = "0.6.8", path = "../postgres-protocol" } +postgres-types = { version = "0.2.9", path = "../postgres-types" } tokio = { version = "1.27", features = ["io-util"] } tokio-postgres-derive = { version = "0.0.0", optional = true, path = "../tokio-postgres-derive" } tokio-util = { version = "0.7", features = ["codec"] } tracing = { version = "0.1", optional = true } tracing-error = { version = "0.2.0", optional = true } -rand = "0.8.5" +rand = "0.9.0" whoami = "1.4.1" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] @@ -76,7 +80,14 @@ socket2 = { version = "0.5", features = ["all"] } [dev-dependencies] futures-executor = "0.3" criterion = "0.5" -tokio = { version = "1.0", features = ["macros", "net", "rt", "rt-multi-thread", "time"] } +env_logger = "0.11" +tokio = { version = "1.0", features = [ + "macros", + "net", + "rt", + "rt-multi-thread", + "time", +] } bit-vec-06 = { version = "0.6", package = "bit-vec" } chrono-04 = { version = "0.4", package = "chrono", default-features = false } @@ -84,6 +95,8 @@ chrono-tz-010 = { version = "0.10", package = "chrono-tz" } eui48-1 = { version = "1.0", package = "eui48", default-features = false } geo-types-06 = { version = "0.6", package = "geo-types" } geo-types-07 = { version = "0.7", package = "geo-types" } +jiff-01 = { version = "0.1", package = "jiff" } +jiff-02 = { version = "0.2", package = "jiff" } serde-1 = { version = "1.0", package = "serde" } serde_json-1 = { version = "1.0", package = "serde_json" } smol_str-01 = { version = "0.1", package = "smol_str" } diff --git a/tokio-postgres/src/cancel_query.rs b/tokio-postgres/src/cancel_query.rs index 078d4b8b6..2dfd47c06 100644 --- a/tokio-postgres/src/cancel_query.rs +++ b/tokio-postgres/src/cancel_query.rs @@ -1,5 +1,5 @@ use crate::client::SocketConfig; -use crate::config::SslMode; +use crate::config::{SslMode, SslNegotiation}; use crate::tls::MakeTlsConnect; use crate::{cancel_query_raw, connect_socket, Error, Socket}; use std::io; @@ -7,6 +7,7 @@ use std::io; pub(crate) async fn cancel_query( config: Option, ssl_mode: SslMode, + ssl_negotiation: SslNegotiation, mut tls: T, process_id: i32, secret_key: i32, @@ -38,6 +39,14 @@ where ) .await?; - cancel_query_raw::cancel_query_raw(socket, ssl_mode, tls, has_hostname, process_id, secret_key) - .await + cancel_query_raw::cancel_query_raw( + socket, + ssl_mode, + ssl_negotiation, + tls, + has_hostname, + process_id, + secret_key, + ) + .await } diff --git a/tokio-postgres/src/cancel_query_raw.rs b/tokio-postgres/src/cancel_query_raw.rs index 41aafe7d9..886606497 100644 --- a/tokio-postgres/src/cancel_query_raw.rs +++ b/tokio-postgres/src/cancel_query_raw.rs @@ -1,4 +1,4 @@ -use crate::config::SslMode; +use crate::config::{SslMode, SslNegotiation}; use crate::tls::TlsConnect; use crate::{connect_tls, Error}; use bytes::BytesMut; @@ -8,6 +8,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; pub async fn cancel_query_raw( stream: S, mode: SslMode, + negotiation: SslNegotiation, tls: T, has_hostname: bool, process_id: i32, @@ -17,7 +18,7 @@ where S: AsyncRead + AsyncWrite + Unpin, T: TlsConnect, { - let mut stream = connect_tls::connect_tls(stream, mode, tls, has_hostname).await?; + let mut stream = connect_tls::connect_tls(stream, mode, negotiation, tls, has_hostname).await?; let mut buf = BytesMut::new(); frontend::cancel_request(process_id, secret_key, &mut buf); diff --git a/tokio-postgres/src/cancel_token.rs b/tokio-postgres/src/cancel_token.rs index c925ce0ca..1652bec72 100644 --- a/tokio-postgres/src/cancel_token.rs +++ b/tokio-postgres/src/cancel_token.rs @@ -1,4 +1,4 @@ -use crate::config::SslMode; +use crate::config::{SslMode, SslNegotiation}; use crate::tls::TlsConnect; #[cfg(feature = "runtime")] use crate::{cancel_query, client::SocketConfig, tls::MakeTlsConnect, Socket}; @@ -12,6 +12,7 @@ pub struct CancelToken { #[cfg(feature = "runtime")] pub(crate) socket_config: Option, pub(crate) ssl_mode: SslMode, + pub(crate) ssl_negotiation: SslNegotiation, pub(crate) process_id: i32, pub(crate) secret_key: i32, } @@ -37,6 +38,7 @@ impl CancelToken { cancel_query::cancel_query( self.socket_config.clone(), self.ssl_mode, + self.ssl_negotiation, tls, self.process_id, self.secret_key, @@ -54,6 +56,7 @@ impl CancelToken { cancel_query_raw::cancel_query_raw( stream, self.ssl_mode, + self.ssl_negotiation, tls, true, self.process_id, diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs index ae7f616aa..870d64bea 100644 --- a/tokio-postgres/src/client.rs +++ b/tokio-postgres/src/client.rs @@ -1,7 +1,8 @@ -use crate::codec::{BackendMessages, FrontendMessage}; -use crate::config::SslMode; +use crate::codec::BackendMessages; +use crate::config::{SslMode, SslNegotiation}; use crate::connection::{Request, RequestMessages}; use crate::copy_out::CopyOutStream; +use crate::error::RowCountCategory; use crate::from_row::FromRow; #[cfg(feature = "runtime")] use crate::keepalive::KeepaliveConfig; @@ -20,9 +21,9 @@ use crate::{ use bytes::{Buf, BytesMut}; use fallible_iterator::FallibleIterator; use futures_channel::mpsc; -use futures_util::{future, pin_mut, ready, stream::BoxStream, StreamExt, TryStreamExt}; +use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt}; use parking_lot::Mutex; -use postgres_protocol::message::{backend::Message, frontend}; +use postgres_protocol::message::backend::Message; use postgres_types::{BorrowToSql, FromSqlOwned}; use std::collections::HashMap; use std::fmt; @@ -182,6 +183,7 @@ pub struct Client { #[cfg(feature = "runtime")] socket_config: Option, ssl_mode: SslMode, + ssl_negotiation: SslNegotiation, process_id: i32, secret_key: i32, } @@ -190,6 +192,7 @@ impl Client { pub(crate) fn new( sender: mpsc::UnboundedSender, ssl_mode: SslMode, + ssl_negotiation: SslNegotiation, process_id: i32, secret_key: i32, ) -> Client { @@ -202,6 +205,7 @@ impl Client { #[cfg(feature = "runtime")] socket_config: None, ssl_mode, + ssl_negotiation, process_id, secret_key, } @@ -324,7 +328,7 @@ impl Client { self.query_one_intern(statement, params).await } - /// An uninstrumented version of `query_one_intern`. + /// An uninstrumented version of `query_one`. async fn query_one_intern( &self, statement: &T, @@ -333,28 +337,11 @@ impl Client { where T: ?Sized + ToStatement + fmt::Debug, { - let statement = statement.__convert().into_statement(&self.inner).await?; - let stream = query::query(&self.inner, statement, slice_iter(params)).await?; - pin_mut!(stream); - - let row = match stream.try_next().await? { - Some(row) => row, - None => { - return Err(Error::row_count( - crate::error::RowCountCategory::One, - crate::error::RowCountCategory::Zero, - )) - } - }; - - if stream.try_next().await?.is_some() { - return Err(Error::row_count( - crate::error::RowCountCategory::One, - crate::error::RowCountCategory::MoreThanOne, - )); - } - - Ok(row) + self.query_opt_intern(statement, params) + .await + .and_then(|res| { + res.ok_or_else(|| Error::row_count(RowCountCategory::One, RowCountCategory::Zero)) + }) } /// Like [`Client::query_one`] but converts row to `T`. @@ -407,8 +394,8 @@ impl Client { self.query_opt_intern(statement, params).await } - /// An uninstrumented version of `query_opt_intern`. - pub async fn query_opt_intern( + /// An uninstrumented version of `query_opt`. + async fn query_opt_intern( &self, statement: &T, params: &[&(dyn ToSql + Sync)], @@ -420,19 +407,25 @@ impl Client { let stream = query::query(&self.inner, statement, slice_iter(params)).await?; pin_mut!(stream); - let row = match stream.try_next().await? { - Some(row) => row, - None => return Ok(None), - }; + let mut first = None; + + // Originally this was two calls to `try_next().await?`, + // once for the first element, and second to error if more than one. + // + // However, this new form with only one .await in a loop generates + // slightly smaller codegen/stack usage for the resulting future. + while let Some(row) = stream.try_next().await? { + if first.is_some() { + return Err(Error::row_count( + RowCountCategory::ZeroOrOne, + RowCountCategory::MoreThanOne, + )); + } - if stream.try_next().await?.is_some() { - return Err(Error::row_count( - crate::error::RowCountCategory::ZeroOrOne, - crate::error::RowCountCategory::MoreThanOne, - )); + first = Some(row); } - Ok(Some(row)) + Ok(first) } /// Like [`Client::query_opt`] but converts row into `T` @@ -478,7 +471,6 @@ impl Client { /// /// ```no_run /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> { - /// use tokio_postgres::types::ToSql; /// use futures_util::{pin_mut, TryStreamExt}; /// /// let params: Vec = vec![ @@ -510,36 +502,68 @@ impl Client { query::query(&self.inner, statement, params).await } - /// Returns a stream of rows - #[cfg_attr(feature = "tracing", tracing::instrument(skip(params)))] - pub async fn stream( + /// Like `query`, but requires the types of query parameters to be explicitly specified. + /// + /// Compared to `query`, this method allows performing queries without three round trips (for + /// prepare, execute, and close) by requiring the caller to specify parameter values along with + /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't + /// supported (such as Cloudflare Workers with Hyperdrive). + /// + /// A statement may contain parameters, specified by `$n`, where `n` is the index of the + /// parameter of the list provided, 1-indexed. + pub async fn query_typed( &self, - statement: &T, - params: &[&(dyn ToSql + Sync)], - ) -> Result - where - T: ?Sized + ToStatement + fmt::Debug, - { - let statement = statement.__convert().into_statement(&self.inner).await?; - let stream = query::query(&self.inner, statement, slice_iter(params)).await?; - Ok(stream) + query: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> Result, Error> { + self.query_typed_raw(query, params.iter().map(|(v, t)| (*v, t.clone()))) + .await? + .try_collect() + .await } - /// Returns a stream of `T`s - #[cfg_attr(feature = "tracing", tracing::instrument(skip(params)))] - pub async fn stream_as( - &self, - statement: &T, - params: &[&(dyn ToSql + Sync)], - ) -> Result>, Error> + /// The maximally flexible version of [`query_typed`]. + /// + /// Compared to `query`, this method allows performing queries without three round trips (for + /// prepare, execute, and close) by requiring the caller to specify parameter values along with + /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't + /// supported (such as Cloudflare Workers with Hyperdrive). + /// + /// A statement may contain parameters, specified by `$n`, where `n` is the index of the + /// parameter of the list provided, 1-indexed. + /// + /// [`query_typed`]: #method.query_typed + /// + /// # Examples + /// + /// ```no_run + /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> { + /// use futures_util::{pin_mut, TryStreamExt}; + /// use tokio_postgres::types::Type; + /// + /// let params: Vec<(String, Type)> = vec![ + /// ("first param".into(), Type::TEXT), + /// ("second param".into(), Type::TEXT), + /// ]; + /// let mut it = client.query_typed_raw( + /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2", + /// params, + /// ).await?; + /// + /// pin_mut!(it); + /// while let Some(row) = it.try_next().await? { + /// let foo: i32 = row.get("foo"); + /// println!("foo: {}", foo); + /// } + /// # Ok(()) + /// # } + /// ``` + pub async fn query_typed_raw(&self, query: &str, params: I) -> Result where - T: ?Sized + ToStatement + fmt::Debug, + P: BorrowToSql, + I: IntoIterator, { - let statement = statement.__convert().into_statement(&self.inner).await?; - let stream = query::query(&self.inner, statement, slice_iter(params)).await?; - Ok(stream - .map(move |x| x.and_then(|x| FromRow::from_row(&x))) - .boxed()) + query::query_typed(&self.inner, query, params).await } /// Executes a statement, returning the number of rows modified. @@ -654,43 +678,7 @@ impl Client { /// /// The transaction will roll back by default - use the `commit` method to commit it. pub async fn transaction(&mut self) -> Result, Error> { - struct RollbackIfNotDone<'me> { - client: &'me Client, - done: bool, - } - - impl<'a> Drop for RollbackIfNotDone<'a> { - fn drop(&mut self) { - if self.done { - return; - } - - let buf = self.client.inner().with_buf(|buf| { - frontend::query("ROLLBACK", buf).unwrap(); - buf.split().freeze() - }); - let _ = self - .client - .inner() - .send(RequestMessages::Single(FrontendMessage::Raw(buf))); - } - } - - // This is done, as `Future` created by this method can be dropped after - // `RequestMessages` is synchronously send to the `Connection` by - // `batch_execute()`, but before `Responses` is asynchronously polled to - // completion. In that case `Transaction` won't be created and thus - // won't be rolled back. - { - let mut cleaner = RollbackIfNotDone { - client: self, - done: false, - }; - self.batch_execute("BEGIN").await?; - cleaner.done = true; - } - - Ok(Transaction::new(self)) + self.build_transaction().start().await } /// Returns a builder for a transaction with custom settings. @@ -708,6 +696,7 @@ impl Client { #[cfg(feature = "runtime")] socket_config: self.socket_config.clone(), ssl_mode: self.ssl_mode, + ssl_negotiation: self.ssl_negotiation, process_id: self.process_id, secret_key: self.secret_key, } diff --git a/tokio-postgres/src/config.rs b/tokio-postgres/src/config.rs index c78346fff..7ba5638e3 100644 --- a/tokio-postgres/src/config.rs +++ b/tokio-postgres/src/config.rs @@ -50,6 +50,20 @@ pub enum SslMode { Require, } +/// TLS negotiation configuration +/// +/// See more information at +/// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNECT-SSLNEGOTIATION +#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)] +#[non_exhaustive] +pub enum SslNegotiation { + /// Use PostgreSQL SslRequest for Ssl negotiation + #[default] + Postgres, + /// Start Ssl handshake without negotiation, only works for PostgreSQL 17+ + Direct, +} + /// Channel binding configuration. #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[non_exhaustive] @@ -106,6 +120,15 @@ pub enum Host { /// path to the directory containing Unix domain sockets. Otherwise, it is treated as a hostname. Multiple hosts /// can be specified, separated by commas. Each host will be tried in turn when connecting. Required if connecting /// with the `connect` method. +/// * `sslnegotiation` - TLS negotiation method. If set to `direct`, the client +/// will perform direct TLS handshake, this only works for PostgreSQL 17 and +/// newer. +/// Note that you will need to setup ALPN of TLS client configuration to +/// `postgresql` when using direct TLS. If you are using postgres_openssl +/// as TLS backend, a `postgres_openssl::set_postgresql_alpn` helper is +/// provided for that. +/// If set to `postgres`, the default value, it follows original postgres +/// wire protocol to perform the negotiation. /// * `hostaddr` - Numeric IP address of host to connect to. This should be in the standard IPv4 address format, /// e.g., 172.28.40.9. If your machine supports IPv6, you can also use those addresses. /// If this parameter is not specified, the value of `host` will be looked up to find the corresponding IP address, @@ -198,6 +221,7 @@ pub struct Config { pub(crate) options: Option, pub(crate) application_name: Option, pub(crate) ssl_mode: SslMode, + pub(crate) ssl_negotiation: SslNegotiation, pub(crate) host: Vec, pub(crate) hostaddr: Vec, pub(crate) port: Vec, @@ -227,6 +251,7 @@ impl Config { options: None, application_name: None, ssl_mode: SslMode::Prefer, + ssl_negotiation: SslNegotiation::Postgres, host: vec![], hostaddr: vec![], port: vec![], @@ -248,8 +273,8 @@ impl Config { /// Sets the user to authenticate with. /// /// Defaults to the user executing this process. - pub fn user(&mut self, user: &str) -> &mut Config { - self.user = Some(user.to_string()); + pub fn user(&mut self, user: impl Into) -> &mut Config { + self.user = Some(user.into()); self } @@ -277,8 +302,8 @@ impl Config { /// Sets the name of the database to connect to. /// /// Defaults to the user. - pub fn dbname(&mut self, dbname: &str) -> &mut Config { - self.dbname = Some(dbname.to_string()); + pub fn dbname(&mut self, dbname: impl Into) -> &mut Config { + self.dbname = Some(dbname.into()); self } @@ -289,8 +314,8 @@ impl Config { } /// Sets command line options used to configure the server. - pub fn options(&mut self, options: &str) -> &mut Config { - self.options = Some(options.to_string()); + pub fn options(&mut self, options: impl Into) -> &mut Config { + self.options = Some(options.into()); self } @@ -301,8 +326,8 @@ impl Config { } /// Sets the value of the `application_name` runtime parameter. - pub fn application_name(&mut self, application_name: &str) -> &mut Config { - self.application_name = Some(application_name.to_string()); + pub fn application_name(&mut self, application_name: impl Into) -> &mut Config { + self.application_name = Some(application_name.into()); self } @@ -325,12 +350,27 @@ impl Config { self.ssl_mode } + /// Sets the SSL negotiation method. + /// + /// Defaults to `postgres`. + pub fn ssl_negotiation(&mut self, ssl_negotiation: SslNegotiation) -> &mut Config { + self.ssl_negotiation = ssl_negotiation; + self + } + + /// Gets the SSL negotiation method. + pub fn get_ssl_negotiation(&self) -> SslNegotiation { + self.ssl_negotiation + } + /// Adds a host to the configuration. /// /// Multiple hosts can be specified by calling this method multiple times, and each will be tried in order. On Unix /// systems, a host starting with a `/` is interpreted as a path to a directory containing Unix domain sockets. /// There must be either no hosts, or the same number of hosts as hostaddrs. - pub fn host(&mut self, host: &str) -> &mut Config { + pub fn host(&mut self, host: impl Into) -> &mut Config { + let host = host.into(); + #[cfg(unix)] { if host.starts_with('/') { @@ -338,7 +378,7 @@ impl Config { } } - self.host.push(Host::Tcp(host.to_string())); + self.host.push(Host::Tcp(host)); self } @@ -548,6 +588,18 @@ impl Config { }; self.ssl_mode(mode); } + "sslnegotiation" => { + let mode = match value { + "postgres" => SslNegotiation::Postgres, + "direct" => SslNegotiation::Direct, + _ => { + return Err(Error::config_parse(Box::new(InvalidValue( + "sslnegotiation", + )))) + } + }; + self.ssl_negotiation(mode); + } "host" => { for host in value.split(',') { self.host(host); @@ -742,6 +794,7 @@ impl fmt::Debug for Config { config_dbg .field("target_session_attrs", &self.target_session_attrs) .field("channel_binding", &self.channel_binding) + .field("load_balance_hosts", &self.load_balance_hosts) .finish() } } @@ -990,7 +1043,7 @@ impl<'a> UrlParser<'a> { let mut it = creds.splitn(2, ':'); let user = self.decode(it.next().unwrap())?; - self.config.user(&user); + self.config.user(user); if let Some(password) = it.next() { let password = Cow::from(percent_encoding::percent_decode(password.as_bytes())); @@ -1053,7 +1106,7 @@ impl<'a> UrlParser<'a> { }; if !dbname.is_empty() { - self.config.dbname(&self.decode(dbname)?); + self.config.dbname(self.decode(dbname)?); } Ok(()) diff --git a/tokio-postgres/src/connect.rs b/tokio-postgres/src/connect.rs index 8189cb91c..e97a7a2a3 100644 --- a/tokio-postgres/src/connect.rs +++ b/tokio-postgres/src/connect.rs @@ -44,7 +44,7 @@ where let mut indices = (0..num_hosts).collect::>(); if config.load_balance_hosts == LoadBalanceHosts::Random { - indices.shuffle(&mut rand::thread_rng()); + indices.shuffle(&mut rand::rng()); } let mut error = None; @@ -101,7 +101,7 @@ where .collect::>(); if config.load_balance_hosts == LoadBalanceHosts::Random { - addrs.shuffle(&mut rand::thread_rng()); + addrs.shuffle(&mut rand::rng()); } let mut last_err = None; diff --git a/tokio-postgres/src/connect_raw.rs b/tokio-postgres/src/connect_raw.rs index 19be9eb01..cf7476cab 100644 --- a/tokio-postgres/src/connect_raw.rs +++ b/tokio-postgres/src/connect_raw.rs @@ -89,7 +89,14 @@ where S: AsyncRead + AsyncWrite + Unpin, T: TlsConnect, { - let stream = connect_tls(stream, config.ssl_mode, tls, has_hostname).await?; + let stream = connect_tls( + stream, + config.ssl_mode, + config.ssl_negotiation, + tls, + has_hostname, + ) + .await?; let mut stream = StartupStream { inner: Framed::new(stream, PostgresCodec), @@ -107,7 +114,13 @@ where let (process_id, secret_key, parameters) = read_info(&mut stream).await?; let (sender, receiver) = mpsc::unbounded(); - let client = Client::new(sender, config.ssl_mode, process_id, secret_key); + let client = Client::new( + sender, + config.ssl_mode, + config.ssl_negotiation, + process_id, + secret_key, + ); let connection = Connection::new(stream.inner, stream.delayed, parameters, receiver); Ok((client, connection)) diff --git a/tokio-postgres/src/connect_socket.rs b/tokio-postgres/src/connect_socket.rs index f27131178..26184701f 100644 --- a/tokio-postgres/src/connect_socket.rs +++ b/tokio-postgres/src/connect_socket.rs @@ -27,10 +27,11 @@ pub(crate) async fn connect_socket( stream.set_nodelay(true).map_err(Error::connect)?; let sock_ref = SockRef::from(&stream); + #[cfg(target_os = "linux")] - { + if let Some(tcp_user_timeout) = tcp_user_timeout { sock_ref - .set_tcp_user_timeout(tcp_user_timeout) + .set_tcp_user_timeout(Some(tcp_user_timeout)) .map_err(Error::connect)?; } diff --git a/tokio-postgres/src/connect_tls.rs b/tokio-postgres/src/connect_tls.rs index 2b1229125..d220cd3b5 100644 --- a/tokio-postgres/src/connect_tls.rs +++ b/tokio-postgres/src/connect_tls.rs @@ -1,4 +1,4 @@ -use crate::config::SslMode; +use crate::config::{SslMode, SslNegotiation}; use crate::maybe_tls_stream::MaybeTlsStream; use crate::tls::private::ForcePrivateApi; use crate::tls::TlsConnect; @@ -10,6 +10,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; pub async fn connect_tls( mut stream: S, mode: SslMode, + negotiation: SslNegotiation, tls: T, has_hostname: bool, ) -> Result, Error> @@ -22,21 +23,26 @@ where SslMode::Prefer if !tls.can_connect(ForcePrivateApi) => { return Ok(MaybeTlsStream::Raw(stream)) } + SslMode::Prefer if negotiation == SslNegotiation::Direct => { + return Err(Error::tls("weak sslmode \"prefer\" may not be used with sslnegotiation=direct (use \"require\", \"verify-ca\", or \"verify-full\")".into())) + } SslMode::Prefer | SslMode::Require => {} } - let mut buf = BytesMut::new(); - frontend::ssl_request(&mut buf); - stream.write_all(&buf).await.map_err(Error::io)?; + if negotiation == SslNegotiation::Postgres { + let mut buf = BytesMut::new(); + frontend::ssl_request(&mut buf); + stream.write_all(&buf).await.map_err(Error::io)?; - let mut buf = [0]; - stream.read_exact(&mut buf).await.map_err(Error::io)?; + let mut buf = [0]; + stream.read_exact(&mut buf).await.map_err(Error::io)?; - if buf[0] != b'S' { - if SslMode::Require == mode { - return Err(Error::tls("server does not support TLS".into())); - } else { - return Ok(MaybeTlsStream::Raw(stream)); + if buf[0] != b'S' { + if SslMode::Require == mode { + return Err(Error::tls("server does not support TLS".into())); + } else { + return Ok(MaybeTlsStream::Raw(stream)); + } } } diff --git a/tokio-postgres/src/error/mod.rs b/tokio-postgres/src/error/mod.rs index 619ae5b4d..3cd906d27 100644 --- a/tokio-postgres/src/error/mod.rs +++ b/tokio-postgres/src/error/mod.rs @@ -108,14 +108,15 @@ impl DbError { let mut routine = None; while let Some(field) = fields.next()? { + let value = String::from_utf8_lossy(field.value_bytes()); match field.type_() { - b'S' => severity = Some(field.value().to_string().into_boxed_str()), - b'C' => code = Some(SqlState::from_code(field.value())), - b'M' => message = Some(field.value().to_string().into_boxed_str()), - b'D' => detail = Some(field.value().to_string().into_boxed_str()), - b'H' => hint = Some(field.value().to_string().into_boxed_str()), + b'S' => severity = Some(value.into_owned().into_boxed_str()), + b'C' => code = Some(SqlState::from_code(&value)), + b'M' => message = Some(value.into_owned().into_boxed_str()), + b'D' => detail = Some(value.into_owned().into_boxed_str()), + b'H' => hint = Some(value.into_owned().into_boxed_str()), b'P' => { - normal_position = Some(field.value().parse::().map_err(|_| { + normal_position = Some(value.parse::().map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, "`P` field did not contain an integer", @@ -123,32 +124,32 @@ impl DbError { })?); } b'p' => { - internal_position = Some(field.value().parse::().map_err(|_| { + internal_position = Some(value.parse::().map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, "`p` field did not contain an integer", ) })?); } - b'q' => internal_query = Some(field.value().to_owned()), - b'W' => where_ = Some(field.value().to_string().into_boxed_str()), - b's' => schema = Some(field.value().to_string().into_boxed_str()), - b't' => table = Some(field.value().to_string().into_boxed_str()), - b'c' => column = Some(field.value().to_string().into_boxed_str()), - b'd' => datatype = Some(field.value().to_string().into_boxed_str()), - b'n' => constraint = Some(field.value().to_string().into_boxed_str()), - b'F' => file = Some(field.value().to_string().into_boxed_str()), + b'q' => internal_query = Some(value.into_owned()), + b'W' => where_ = Some(value.into_owned().into_boxed_str()), + b's' => schema = Some(value.into_owned().into_boxed_str()), + b't' => table = Some(value.into_owned().into_boxed_str()), + b'c' => column = Some(value.into_owned().into_boxed_str()), + b'd' => datatype = Some(value.into_owned().into_boxed_str()), + b'n' => constraint = Some(value.into_owned().into_boxed_str()), + b'F' => file = Some(value.into_owned().into_boxed_str()), b'L' => { - line = Some(field.value().parse::().map_err(|_| { + line = Some(value.parse::().map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, "`L` field did not contain an integer", ) })?); } - b'R' => routine = Some(field.value().to_string().into_boxed_str()), + b'R' => routine = Some(value.into_owned().into_boxed_str()), b'V' => { - parsed_severity = Some(Severity::from_str(field.value()).ok_or_else(|| { + parsed_severity = Some(Severity::from_str(&value).ok_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, "`V` field contained an invalid value", diff --git a/tokio-postgres/src/generic_client.rs b/tokio-postgres/src/generic_client.rs index 5e9ff2143..c933bea7b 100644 --- a/tokio-postgres/src/generic_client.rs +++ b/tokio-postgres/src/generic_client.rs @@ -2,7 +2,7 @@ use std::fmt; use crate::query::RowStream; use crate::types::{BorrowToSql, ToSql, Type}; -use crate::{Client, Error, Row, Statement, ToStatement, Transaction}; +use crate::{Client, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction}; use async_trait::async_trait; mod private { @@ -14,12 +14,12 @@ mod private { /// This trait is "sealed", and cannot be implemented outside of this crate. #[async_trait] pub trait GenericClient: private::Sealed { - /// Like `Client::execute`. + /// Like [`Client::execute`]. async fn execute(&self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result where T: ?Sized + ToStatement + Sync + Send + fmt::Debug; - /// Like `Client::execute_raw`. + /// Like [`Client::execute_raw`]. async fn execute_raw(&self, statement: &T, params: I) -> Result where T: ?Sized + ToStatement + Sync + Send + fmt::Debug, @@ -27,12 +27,12 @@ pub trait GenericClient: private::Sealed { I: IntoIterator + Sync + Send, I::IntoIter: ExactSizeIterator; - /// Like `Client::query`. + /// Like [`Client::query`]. async fn query(&self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result, Error> where T: ?Sized + ToStatement + Sync + Send + fmt::Debug; - /// Like `Client::query_one`. + /// Like [`Client::query_one`]. async fn query_one( &self, statement: &T, @@ -41,7 +41,7 @@ pub trait GenericClient: private::Sealed { where T: ?Sized + ToStatement + Sync + Send + fmt::Debug; - /// Like `Client::query_opt`. + /// Like [`Client::query_opt`]. async fn query_opt( &self, statement: &T, @@ -50,7 +50,7 @@ pub trait GenericClient: private::Sealed { where T: ?Sized + ToStatement + Sync + Send + fmt::Debug; - /// Like `Client::query_raw`. + /// Like [`Client::query_raw`]. async fn query_raw(&self, statement: &T, params: I) -> Result where T: ?Sized + ToStatement + Sync + Send + fmt::Debug, @@ -58,23 +58,39 @@ pub trait GenericClient: private::Sealed { I: IntoIterator + Sync + Send, I::IntoIter: ExactSizeIterator; - /// Like `Client::prepare`. + /// Like [`Client::query_typed`] + async fn query_typed( + &self, + statement: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> Result, Error>; + + /// Like [`Client::query_typed_raw`] + async fn query_typed_raw(&self, statement: &str, params: I) -> Result + where + P: BorrowToSql, + I: IntoIterator + Sync + Send; + + /// Like [`Client::prepare`]. async fn prepare(&self, query: &str) -> Result; - /// Like `Client::prepare_typed`. + /// Like [`Client::prepare_typed`]. async fn prepare_typed( &self, query: &str, parameter_types: &[Type], ) -> Result; - /// Like `Client::transaction`. - async fn transaction(&mut self) -> Result, Error>; + /// Like [`Client::transaction`]. + async fn transaction<'a>(&'a mut self) -> Result, Error>; - /// Like `Client::batch_execute`. + /// Like [`Client::batch_execute`]. async fn batch_execute(&self, query: &str) -> Result<(), Error>; - /// Returns a reference to the underlying `Client`. + /// Like [`Client::simple_query`]. + async fn simple_query(&self, query: &str) -> Result, Error>; + + /// Returns a reference to the underlying [`Client`]. fn client(&self) -> &Client; } @@ -138,6 +154,22 @@ impl GenericClient for Client { self.query_raw(statement, params).await } + async fn query_typed( + &self, + statement: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> Result, Error> { + self.query_typed(statement, params).await + } + + async fn query_typed_raw(&self, statement: &str, params: I) -> Result + where + P: BorrowToSql, + I: IntoIterator + Sync + Send, + { + self.query_typed_raw(statement, params).await + } + async fn prepare(&self, query: &str) -> Result { self.prepare(query).await } @@ -150,7 +182,7 @@ impl GenericClient for Client { self.prepare_typed(query, parameter_types).await } - async fn transaction(&mut self) -> Result, Error> { + async fn transaction<'a>(&'a mut self) -> Result, Error> { self.transaction().await } @@ -158,6 +190,10 @@ impl GenericClient for Client { self.batch_execute(query).await } + async fn simple_query(&self, query: &str) -> Result, Error> { + self.simple_query(query).await + } + fn client(&self) -> &Client { self } @@ -224,6 +260,22 @@ impl GenericClient for Transaction<'_> { self.query_raw(statement, params).await } + async fn query_typed( + &self, + statement: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> Result, Error> { + self.query_typed(statement, params).await + } + + async fn query_typed_raw(&self, statement: &str, params: I) -> Result + where + P: BorrowToSql, + I: IntoIterator + Sync + Send, + { + self.query_typed_raw(statement, params).await + } + async fn prepare(&self, query: &str) -> Result { self.prepare(query).await } @@ -245,6 +297,10 @@ impl GenericClient for Transaction<'_> { self.batch_execute(query).await } + async fn simple_query(&self, query: &str) -> Result, Error> { + self.simple_query(query).await + } + fn client(&self) -> &Client { self.client() } diff --git a/tokio-postgres/src/keepalive.rs b/tokio-postgres/src/keepalive.rs index c409eb0ea..7bdd76341 100644 --- a/tokio-postgres/src/keepalive.rs +++ b/tokio-postgres/src/keepalive.rs @@ -12,12 +12,18 @@ impl From<&KeepaliveConfig> for TcpKeepalive { fn from(keepalive_config: &KeepaliveConfig) -> Self { let mut tcp_keepalive = Self::new().with_time(keepalive_config.idle); - #[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "openbsd")))] + #[cfg(not(any( + target_os = "aix", + target_os = "redox", + target_os = "solaris", + target_os = "openbsd" + )))] if let Some(interval) = keepalive_config.interval { tcp_keepalive = tcp_keepalive.with_interval(interval); } #[cfg(not(any( + target_os = "aix", target_os = "redox", target_os = "solaris", target_os = "windows", diff --git a/tokio-postgres/src/lib.rs b/tokio-postgres/src/lib.rs index c17064755..a3c1b2b27 100644 --- a/tokio-postgres/src/lib.rs +++ b/tokio-postgres/src/lib.rs @@ -111,6 +111,7 @@ //! | `with-eui48-1` | Enable support for the 1.0 version of the `eui48` crate. | [eui48](https://crates.io/crates/eui48) 1.0 | no | //! | `with-geo-types-0_6` | Enable support for the 0.6 version of the `geo-types` crate. | [geo-types](https://crates.io/crates/geo-types/0.6.0) 0.6 | no | //! | `with-geo-types-0_7` | Enable support for the 0.7 version of the `geo-types` crate. | [geo-types](https://crates.io/crates/geo-types/0.7.0) 0.7 | no | +//! | `with-jiff-0_1` | Enable support for the 0.1 version of the `jiff` crate. | [jiff](https://crates.io/crates/jiff/0.1.0) 0.1 | no | //! | `with-serde_json-1` | Enable support for the `serde_json` crate. | [serde_json](https://crates.io/crates/serde_json) 1.0 | no | //! | `with-uuid-0_8` | Enable support for the `uuid` crate. | [uuid](https://crates.io/crates/uuid) 0.8 | no | //! | `with-uuid-1` | Enable support for the `uuid` crate. | [uuid](https://crates.io/crates/uuid) 1.0 | no | @@ -131,7 +132,7 @@ pub use crate::generic_client::GenericClient; pub use crate::portal::Portal; pub use crate::query::RowStream; pub use crate::row::{Row, SimpleQueryRow}; -pub use crate::simple_query::SimpleQueryStream; +pub use crate::simple_query::{SimpleColumn, SimpleQueryStream}; #[cfg(feature = "runtime")] pub use crate::socket::Socket; pub use crate::statement::{Column, Statement}; @@ -142,6 +143,7 @@ pub use crate::to_statement::ToStatement; pub use crate::transaction::Transaction; pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder}; use crate::types::ToSql; +use std::sync::Arc; pub mod binary_copy; mod bind; @@ -251,6 +253,8 @@ pub enum SimpleQueryMessage { /// /// The number of rows modified or selected is returned. CommandComplete(u64), + /// Column values of the proceeding row values + RowDescription(Arc<[SimpleColumn]>), } fn slice_iter<'a>( diff --git a/tokio-postgres/src/prepare.rs b/tokio-postgres/src/prepare.rs index a3c475100..120552376 100644 --- a/tokio-postgres/src/prepare.rs +++ b/tokio-postgres/src/prepare.rs @@ -134,7 +134,7 @@ fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Resu }) } -async fn get_type(client: &Arc, oid: Oid) -> Result { +pub(crate) async fn get_type(client: &Arc, oid: Oid) -> Result { if let Some(type_) = Type::from_oid(oid) { return Ok(type_); } diff --git a/tokio-postgres/src/query.rs b/tokio-postgres/src/query.rs index 1ce048b54..c1e50badc 100644 --- a/tokio-postgres/src/query.rs +++ b/tokio-postgres/src/query.rs @@ -1,21 +1,25 @@ use crate::client::{InnerClient, Responses}; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; +use crate::prepare::get_type; use crate::types::{BorrowToSql, IsNull}; -use crate::{debug, Error, Portal, Row, Statement}; +use crate::{debug, Column, Error, Portal, Row, Statement}; use bytes::{Bytes, BytesMut}; +use fallible_iterator::FallibleIterator; use futures_util::{ready, Stream}; use pin_project_lite::pin_project; use postgres_protocol::message::backend::{CommandCompleteBody, Message}; use postgres_protocol::message::frontend; +use postgres_types::Type; use std::fmt; use std::marker::PhantomPinned; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; struct BorrowToSqlParamsDebug<'a, T>(&'a [T]); -impl<'a, T> fmt::Debug for BorrowToSqlParamsDebug<'a, T> +impl fmt::Debug for BorrowToSqlParamsDebug<'_, T> where T: BorrowToSql, { @@ -78,6 +82,68 @@ where }) } +pub async fn query_typed( + client: &Arc, + query: &str, + params: I, +) -> Result +where + P: BorrowToSql, + I: IntoIterator, +{ + let buf = { + let params = params.into_iter().collect::>(); + let param_oids = params.iter().map(|(_, t)| t.oid()).collect::>(); + + client.with_buf(|buf| { + frontend::parse("", query, param_oids.into_iter(), buf).map_err(Error::parse)?; + encode_bind_raw("", params, "", buf)?; + frontend::describe(b'S', "", buf).map_err(Error::encode)?; + frontend::execute("", 0, buf).map_err(Error::encode)?; + frontend::sync(buf); + + Ok(buf.split().freeze()) + })? + }; + + let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?; + + loop { + match responses.next().await? { + Message::ParseComplete | Message::BindComplete | Message::ParameterDescription(_) => {} + Message::NoData => { + return Ok(RowStream { + statement: Statement::unnamed(vec![], vec![]), + responses, + rows_affected: None, + _p: PhantomPinned, + }); + } + Message::RowDescription(row_description) => { + let mut columns: Vec = vec![]; + let mut it = row_description.fields(); + while let Some(field) = it.next().map_err(Error::parse)? { + let type_ = get_type(client, field.type_oid()).await?; + let column = Column { + name: field.name().to_string(), + table_oid: Some(field.table_oid()).filter(|n| *n != 0), + column_id: Some(field.column_id()).filter(|n| *n != 0), + r#type: type_, + }; + columns.push(column); + } + return Ok(RowStream { + statement: Statement::unnamed(vec![], columns), + responses, + rows_affected: None, + _p: PhantomPinned, + }); + } + _ => return Err(Error::unexpected_message()), + } + } +} + pub async fn query_portal( client: &InnerClient, portal: &Portal, @@ -185,27 +251,42 @@ where I: IntoIterator, I::IntoIter: ExactSizeIterator, { - let param_types = statement.params(); let params = params.into_iter(); - - if param_types.len() != params.len() { - return Err(Error::parameters(params.len(), param_types.len())); + if params.len() != statement.params().len() { + return Err(Error::parameters(params.len(), statement.params().len())); } + encode_bind_raw( + statement.name(), + params.zip(statement.params().iter().cloned()), + portal, + buf, + ) +} + +fn encode_bind_raw( + statement_name: &str, + params: I, + portal: &str, + buf: &mut BytesMut, +) -> Result<(), Error> +where + P: BorrowToSql, + I: IntoIterator, + I::IntoIter: ExactSizeIterator, +{ let (param_formats, params): (Vec<_>, Vec<_>) = params - .zip(param_types.iter()) - .map(|(p, ty)| (p.borrow_to_sql().encode_format(ty) as i16, p)) + .into_iter() + .map(|(p, ty)| (p.borrow_to_sql().encode_format(&ty) as i16, (p, ty))) .unzip(); - let params = params.into_iter(); - let mut error_idx = 0; let r = frontend::bind( portal, - statement.name(), + statement_name, param_formats, - params.zip(param_types).enumerate(), - |(idx, (param, ty)), buf| match param.borrow_to_sql().to_sql_checked(ty, buf) { + params.into_iter().enumerate(), + |(idx, (param, ty)), buf| match param.borrow_to_sql().to_sql_checked(&ty, buf) { Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No), Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes), Err(e) => { diff --git a/tokio-postgres/src/row.rs b/tokio-postgres/src/row.rs index 3c79de603..ccb8817d0 100644 --- a/tokio-postgres/src/row.rs +++ b/tokio-postgres/src/row.rs @@ -79,9 +79,9 @@ impl RowIndex for str { } } -impl<'a, T> Sealed for &'a T where T: ?Sized + Sealed {} +impl Sealed for &T where T: ?Sized + Sealed {} -impl<'a, T> RowIndex for &'a T +impl RowIndex for &T where T: ?Sized + RowIndex, { @@ -95,6 +95,7 @@ where } /// A row of data returned from the database by a query. +#[derive(Clone)] pub struct Row { statement: Statement, body: DataRowBody, diff --git a/tokio-postgres/src/simple_query.rs b/tokio-postgres/src/simple_query.rs index c58f6fb0e..309420c83 100644 --- a/tokio-postgres/src/simple_query.rs +++ b/tokio-postgres/src/simple_query.rs @@ -84,35 +84,34 @@ impl Stream for SimpleQueryStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - loop { - match ready!(this.responses.poll_next(cx)?) { - Message::CommandComplete(body) => { - let rows = extract_row_affected(&body)?; - return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows)))); - } - Message::EmptyQueryResponse => { - return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0)))); - } - Message::RowDescription(body) => { - let columns = body - .fields() - .map(|f| Ok(SimpleColumn::new(f.name().to_string()))) - .collect::>() - .map_err(Error::parse)? - .into(); + match ready!(this.responses.poll_next(cx)?) { + Message::CommandComplete(body) => { + let rows = extract_row_affected(&body)?; + Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows)))) + } + Message::EmptyQueryResponse => { + Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0)))) + } + Message::RowDescription(body) => { + let columns: Arc<[SimpleColumn]> = body + .fields() + .map(|f| Ok(SimpleColumn::new(f.name().to_string()))) + .collect::>() + .map_err(Error::parse)? + .into(); - *this.columns = Some(columns); - } - Message::DataRow(body) => { - let row = match &this.columns { - Some(columns) => SimpleQueryRow::new(columns.clone(), body)?, - None => return Poll::Ready(Some(Err(Error::unexpected_message()))), - }; - return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row)))); - } - Message::ReadyForQuery(_) => return Poll::Ready(None), - _ => return Poll::Ready(Some(Err(Error::unexpected_message()))), + *this.columns = Some(columns.clone()); + Poll::Ready(Some(Ok(SimpleQueryMessage::RowDescription(columns)))) + } + Message::DataRow(body) => { + let row = match &this.columns { + Some(columns) => SimpleQueryRow::new(columns.clone(), body)?, + None => return Poll::Ready(Some(Err(Error::unexpected_message()))), + }; + Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row)))) } + Message::ReadyForQuery(_) => Poll::Ready(None), + _ => Poll::Ready(Some(Err(Error::unexpected_message()))), } } } diff --git a/tokio-postgres/src/statement.rs b/tokio-postgres/src/statement.rs index d9189a6bc..b123d4827 100644 --- a/tokio-postgres/src/statement.rs +++ b/tokio-postgres/src/statement.rs @@ -15,6 +15,10 @@ struct StatementInner { impl Drop for StatementInner { fn drop(&mut self) { + if self.name.is_empty() { + // Unnamed statements don't need to be closed + return; + } if let Some(client) = self.client.upgrade() { let buf = client.with_buf(|buf| { frontend::close(b'S', &self.name, buf).unwrap(); @@ -29,7 +33,7 @@ impl Drop for StatementInner { /// A prepared statement. /// /// Prepared statements can only be used with the connection that created them. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct Statement(Arc); impl Statement { @@ -47,6 +51,15 @@ impl Statement { })) } + pub(crate) fn unnamed(params: Vec, columns: Vec) -> Statement { + Statement(Arc::new(StatementInner { + client: Weak::new(), + name: String::new(), + params, + columns, + })) + } + pub(crate) fn name(&self) -> &str { &self.0.name } @@ -62,6 +75,16 @@ impl Statement { } } +impl std::fmt::Debug for Statement { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + f.debug_struct("Statement") + .field("name", &self.0.name) + .field("params", &self.0.params) + .field("columns", &self.0.columns) + .finish_non_exhaustive() + } +} + /// Information about a column of a query. #[derive(Debug)] pub struct Column { diff --git a/tokio-postgres/src/to_statement.rs b/tokio-postgres/src/to_statement.rs index dd41b074e..8c6ccdcd5 100644 --- a/tokio-postgres/src/to_statement.rs +++ b/tokio-postgres/src/to_statement.rs @@ -13,7 +13,7 @@ mod private { Query(&'a str), } - impl<'a> ToStatementType<'a> { + impl ToStatementType<'_> { pub async fn into_statement(self, client: &Arc) -> Result { match self { ToStatementType::Statement(s) => Ok(s.clone()), diff --git a/tokio-postgres/src/transaction.rs b/tokio-postgres/src/transaction.rs index 09b1b6f8f..4941c111e 100644 --- a/tokio-postgres/src/transaction.rs +++ b/tokio-postgres/src/transaction.rs @@ -13,7 +13,7 @@ use crate::{ SimpleQueryMessage, Statement, ToStatement, }; use bytes::Buf; -use futures_util::{stream::BoxStream, TryStreamExt}; +use futures_util::TryStreamExt; use postgres_protocol::message::frontend; use postgres_types::FromSqlOwned; use std::fmt; @@ -30,7 +30,7 @@ pub struct Transaction<'a> { done: bool, } -impl<'a> fmt::Debug for Transaction<'a> { +impl fmt::Debug for Transaction<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Transaction") .field("savepoint", &self.savepoint) @@ -46,7 +46,7 @@ struct Savepoint { depth: u32, } -impl<'a> Drop for Transaction<'a> { +impl Drop for Transaction<'_> { fn drop(&mut self) { if self.done { return; @@ -248,34 +248,25 @@ impl<'a> Transaction<'a> { self.client.query_raw(statement, params).await } - /// Like [`Client::stream`] - #[cfg_attr(feature = "tracing", tracing::instrument(skip(params)))] - pub async fn stream( + /// Like `Client::query_typed`. + pub async fn query_typed( &self, - statement: &T, - params: &[&(dyn ToSql + Sync)], - ) -> Result - where - T: ?Sized + ToStatement + fmt::Debug, - { - self.client.stream(statement, params).await + statement: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> Result, Error> { + self.client.query_typed(statement, params).await } - /// Like [`Client::stream_as`] - #[cfg_attr(feature = "tracing", tracing::instrument(skip(params)))] - pub async fn stream_as( - &self, - statement: &T, - params: &[&(dyn ToSql + Sync)], - ) -> Result>, Error> + /// Like `Client::query_typed_raw`. + pub async fn query_typed_raw(&self, query: &str, params: I) -> Result where - T: ?Sized + ToStatement + fmt::Debug, + P: BorrowToSql, + I: IntoIterator, { - self.client.stream_as(statement, params).await + self.client.query_typed_raw(query, params).await } - /// Like [`Client::execute`] - #[cfg_attr(feature = "tracing", tracing::instrument(skip(params)))] + /// Like `Client::execute`. pub async fn execute( &self, statement: &T, @@ -453,7 +444,7 @@ impl<'a> Transaction<'a> { } } -impl<'a> Deref for Transaction<'a> { +impl Deref for Transaction<'_> { type Target = Client; fn deref(&self) -> &Self::Target { diff --git a/tokio-postgres/src/transaction_builder.rs b/tokio-postgres/src/transaction_builder.rs index 9718ac588..88c883176 100644 --- a/tokio-postgres/src/transaction_builder.rs +++ b/tokio-postgres/src/transaction_builder.rs @@ -1,4 +1,6 @@ -use crate::{Client, Error, Transaction}; +use postgres_protocol::message::frontend; + +use crate::{codec::FrontendMessage, connection::RequestMessages, Client, Error, Transaction}; /// The isolation level of a database transaction. #[derive(Debug, Copy, Clone)] @@ -106,7 +108,41 @@ impl<'a> TransactionBuilder<'a> { query.push_str(s); } - self.client.batch_execute(&query).await?; + struct RollbackIfNotDone<'me> { + client: &'me Client, + done: bool, + } + + impl Drop for RollbackIfNotDone<'_> { + fn drop(&mut self) { + if self.done { + return; + } + + let buf = self.client.inner().with_buf(|buf| { + frontend::query("ROLLBACK", buf).unwrap(); + buf.split().freeze() + }); + let _ = self + .client + .inner() + .send(RequestMessages::Single(FrontendMessage::Raw(buf))); + } + } + + // This is done as `Future` created by this method can be dropped after + // `RequestMessages` is synchronously send to the `Connection` by + // `batch_execute()`, but before `Responses` is asynchronously polled to + // completion. In that case `Transaction` won't be created and thus + // won't be rolled back. + { + let mut cleaner = RollbackIfNotDone { + client: self.client, + done: false, + }; + self.client.batch_execute(&query).await?; + cleaner.done = true; + } Ok(Transaction::new(self.client)) } diff --git a/tokio-postgres/tests/test/main.rs b/tokio-postgres/tests/test/main.rs index e7560bbbd..02ce35c21 100644 --- a/tokio-postgres/tests/test/main.rs +++ b/tokio-postgres/tests/test/main.rs @@ -328,6 +328,13 @@ async fn simple_query() { _ => panic!("unexpected message"), } match &messages[2] { + SimpleQueryMessage::RowDescription(columns) => { + assert_eq!(columns.get(0).map(|c| c.name()), Some("id")); + assert_eq!(columns.get(1).map(|c| c.name()), Some("name")); + } + _ => panic!("unexpected message"), + } + match &messages[3] { SimpleQueryMessage::Row(row) => { assert_eq!(row.columns().first().map(|c| c.name()), Some("id")); assert_eq!(row.columns().get(1).map(|c| c.name()), Some("name")); @@ -336,7 +343,7 @@ async fn simple_query() { } _ => panic!("unexpected message"), } - match &messages[3] { + match &messages[4] { SimpleQueryMessage::Row(row) => { assert_eq!(row.columns().first().map(|c| c.name()), Some("id")); assert_eq!(row.columns().get(1).map(|c| c.name()), Some("name")); @@ -345,11 +352,11 @@ async fn simple_query() { } _ => panic!("unexpected message"), } - match messages[4] { + match messages[5] { SimpleQueryMessage::CommandComplete(2) => {} _ => panic!("unexpected message"), } - assert_eq!(messages.len(), 5); + assert_eq!(messages.len(), 6); } #[tokio::test] @@ -953,6 +960,126 @@ async fn deferred_constraint() { .unwrap_err(); } +#[tokio::test] +async fn query_typed_no_transaction() { + let client = connect("user=postgres").await; + + client + .batch_execute( + " + CREATE TEMPORARY TABLE foo ( + name TEXT, + age INT + ); + INSERT INTO foo (name, age) VALUES ('alice', 20), ('bob', 30), ('carol', 40); + ", + ) + .await + .unwrap(); + + let rows: Vec = client + .query_typed( + "SELECT name, age, 'literal', 5 FROM foo WHERE name <> $1 AND age < $2 ORDER BY age", + &[(&"alice", Type::TEXT), (&50i32, Type::INT4)], + ) + .await + .unwrap(); + + assert_eq!(rows.len(), 2); + let first_row = &rows[0]; + assert_eq!(first_row.get::<_, &str>(0), "bob"); + assert_eq!(first_row.get::<_, i32>(1), 30); + assert_eq!(first_row.get::<_, &str>(2), "literal"); + assert_eq!(first_row.get::<_, i32>(3), 5); + + let second_row = &rows[1]; + assert_eq!(second_row.get::<_, &str>(0), "carol"); + assert_eq!(second_row.get::<_, i32>(1), 40); + assert_eq!(second_row.get::<_, &str>(2), "literal"); + assert_eq!(second_row.get::<_, i32>(3), 5); + + // Test for UPDATE that returns no data + let updated_rows = client + .query_typed("UPDATE foo set age = 33", &[]) + .await + .unwrap(); + assert_eq!(updated_rows.len(), 0); +} + +#[tokio::test] +async fn query_typed_with_transaction() { + let mut client = connect("user=postgres").await; + + client + .batch_execute( + " + CREATE TEMPORARY TABLE foo ( + name TEXT, + age INT + ); + ", + ) + .await + .unwrap(); + + let transaction = client.transaction().await.unwrap(); + + let rows: Vec = transaction + .query_typed( + "INSERT INTO foo (name, age) VALUES ($1, $2), ($3, $4), ($5, $6) returning name, age", + &[ + (&"alice", Type::TEXT), + (&20i32, Type::INT4), + (&"bob", Type::TEXT), + (&30i32, Type::INT4), + (&"carol", Type::TEXT), + (&40i32, Type::INT4), + ], + ) + .await + .unwrap(); + let inserted_values: Vec<(String, i32)> = rows + .iter() + .map(|row| (row.get::<_, String>(0), row.get::<_, i32>(1))) + .collect(); + assert_eq!( + inserted_values, + [ + ("alice".to_string(), 20), + ("bob".to_string(), 30), + ("carol".to_string(), 40) + ] + ); + + let rows: Vec = transaction + .query_typed( + "SELECT name, age, 'literal', 5 FROM foo WHERE name <> $1 AND age < $2 ORDER BY age", + &[(&"alice", Type::TEXT), (&50i32, Type::INT4)], + ) + .await + .unwrap(); + + assert_eq!(rows.len(), 2); + let first_row = &rows[0]; + assert_eq!(first_row.get::<_, &str>(0), "bob"); + assert_eq!(first_row.get::<_, i32>(1), 30); + assert_eq!(first_row.get::<_, &str>(2), "literal"); + assert_eq!(first_row.get::<_, i32>(3), 5); + + let second_row = &rows[1]; + assert_eq!(second_row.get::<_, &str>(0), "carol"); + assert_eq!(second_row.get::<_, i32>(1), 40); + assert_eq!(second_row.get::<_, &str>(2), "literal"); + assert_eq!(second_row.get::<_, i32>(3), 5); + + // Test for UPDATE that returns no data + let updated_rows = transaction + .query_typed("UPDATE foo set age = 33", &[]) + .await + .unwrap(); + assert_eq!(updated_rows.len(), 0); +} + #[tokio::test] async fn query_opt_scalar() { let client = connect("user=postgres").await; diff --git a/tokio-postgres/tests/test/parse.rs b/tokio-postgres/tests/test/parse.rs index 04d422e27..35eeca72b 100644 --- a/tokio-postgres/tests/test/parse.rs +++ b/tokio-postgres/tests/test/parse.rs @@ -1,5 +1,5 @@ use std::time::Duration; -use tokio_postgres::config::{Config, TargetSessionAttrs}; +use tokio_postgres::config::{Config, SslNegotiation, TargetSessionAttrs}; fn check(s: &str, config: &Config) { assert_eq!(s.parse::().expect(s), *config, "`{}`", s); @@ -42,6 +42,10 @@ fn settings() { .keepalives_idle(Duration::from_secs(30)) .target_session_attrs(TargetSessionAttrs::ReadOnly), ); + check( + "sslnegotiation=direct", + Config::new().ssl_negotiation(SslNegotiation::Direct), + ); } #[test] diff --git a/tokio-postgres/tests/test/types/chrono_04.rs b/tokio-postgres/tests/test/types/chrono_04.rs index b010055ba..eda8151a6 100644 --- a/tokio-postgres/tests/test/types/chrono_04.rs +++ b/tokio-postgres/tests/test/types/chrono_04.rs @@ -1,4 +1,4 @@ -use chrono_04::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc}; +use chrono_04::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use std::fmt; use tokio_postgres::types::{Date, FromSqlOwned, Timestamp}; use tokio_postgres::Client; @@ -53,18 +53,20 @@ async fn test_with_special_naive_date_time_params() { async fn test_date_time_params() { fn make_check(time: &str) -> (Option>, &str) { ( - Some(Utc.from_utc_datetime( - &NaiveDateTime::parse_from_str(time, "'%Y-%m-%d %H:%M:%S.%f'").unwrap(), - )), + Some( + DateTime::parse_from_str(time, "'%Y-%m-%d %H:%M:%S.%f%#z'") + .unwrap() + .to_utc(), + ), time, ) } test_type( "TIMESTAMP WITH TIME ZONE", &[ - make_check("'1970-01-01 00:00:00.010000000'"), - make_check("'1965-09-25 11:19:33.100314000'"), - make_check("'2010-02-09 23:11:45.120200000'"), + make_check("'1970-01-01 00:00:00.010000000Z'"), + make_check("'1965-09-25 11:19:33.100314000Z'"), + make_check("'2010-02-09 23:11:45.120200000Z'"), (None, "NULL"), ], ) @@ -75,18 +77,20 @@ async fn test_date_time_params() { async fn test_with_special_date_time_params() { fn make_check(time: &str) -> (Timestamp>, &str) { ( - Timestamp::Value(Utc.from_utc_datetime( - &NaiveDateTime::parse_from_str(time, "'%Y-%m-%d %H:%M:%S.%f'").unwrap(), - )), + Timestamp::Value( + DateTime::parse_from_str(time, "'%Y-%m-%d %H:%M:%S.%f%#z'") + .unwrap() + .to_utc(), + ), time, ) } test_type( "TIMESTAMP WITH TIME ZONE", &[ - make_check("'1970-01-01 00:00:00.010000000'"), - make_check("'1965-09-25 11:19:33.100314000'"), - make_check("'2010-02-09 23:11:45.120200000'"), + make_check("'1970-01-01 00:00:00.010000000Z'"), + make_check("'1965-09-25 11:19:33.100314000Z'"), + make_check("'2010-02-09 23:11:45.120200000Z'"), (Timestamp::PosInfinity, "'infinity'"), (Timestamp::NegInfinity, "'-infinity'"), ], diff --git a/tokio-postgres/tests/test/types/jiff_01.rs b/tokio-postgres/tests/test/types/jiff_01.rs new file mode 100644 index 000000000..7c9052676 --- /dev/null +++ b/tokio-postgres/tests/test/types/jiff_01.rs @@ -0,0 +1,175 @@ +use jiff_01::{ + civil::{Date as JiffDate, DateTime, Time}, + Timestamp as JiffTimestamp, +}; +use std::fmt; +use tokio_postgres::{ + types::{Date, FromSqlOwned, Timestamp}, + Client, +}; + +use crate::connect; +use crate::types::test_type; + +#[tokio::test] +async fn test_datetime_params() { + fn make_check(s: &str) -> (Option, &str) { + (Some(s.trim_matches('\'').parse().unwrap()), s) + } + test_type( + "TIMESTAMP", + &[ + make_check("'1970-01-01 00:00:00.010000000'"), + make_check("'1965-09-25 11:19:33.100314000'"), + make_check("'2010-02-09 23:11:45.120200000'"), + (None, "NULL"), + ], + ) + .await; +} + +#[tokio::test] +async fn test_with_special_datetime_params() { + fn make_check(s: &str) -> (Timestamp, &str) { + (Timestamp::Value(s.trim_matches('\'').parse().unwrap()), s) + } + test_type( + "TIMESTAMP", + &[ + make_check("'1970-01-01 00:00:00.010000000'"), + make_check("'1965-09-25 11:19:33.100314000'"), + make_check("'2010-02-09 23:11:45.120200000'"), + (Timestamp::PosInfinity, "'infinity'"), + (Timestamp::NegInfinity, "'-infinity'"), + ], + ) + .await; +} + +#[tokio::test] +async fn test_timestamp_params() { + fn make_check(s: &str) -> (Option, &str) { + (Some(s.trim_matches('\'').parse().unwrap()), s) + } + test_type( + "TIMESTAMP WITH TIME ZONE", + &[ + make_check("'1970-01-01 00:00:00.010000000Z'"), + make_check("'1965-09-25 11:19:33.100314000Z'"), + make_check("'2010-02-09 23:11:45.120200000Z'"), + (None, "NULL"), + ], + ) + .await; +} + +#[tokio::test] +async fn test_with_special_timestamp_params() { + fn make_check(s: &str) -> (Timestamp, &str) { + (Timestamp::Value(s.trim_matches('\'').parse().unwrap()), s) + } + test_type( + "TIMESTAMP WITH TIME ZONE", + &[ + make_check("'1970-01-01 00:00:00.010000000Z'"), + make_check("'1965-09-25 11:19:33.100314000Z'"), + make_check("'2010-02-09 23:11:45.120200000Z'"), + (Timestamp::PosInfinity, "'infinity'"), + (Timestamp::NegInfinity, "'-infinity'"), + ], + ) + .await; +} + +#[tokio::test] +async fn test_date_params() { + fn make_check(s: &str) -> (Option, &str) { + (Some(s.trim_matches('\'').parse().unwrap()), s) + } + test_type( + "DATE", + &[ + make_check("'1970-01-01'"), + make_check("'1965-09-25'"), + make_check("'2010-02-09'"), + (None, "NULL"), + ], + ) + .await; +} + +#[tokio::test] +async fn test_with_special_date_params() { + fn make_check(s: &str) -> (Date, &str) { + (Date::Value(s.trim_matches('\'').parse().unwrap()), s) + } + test_type( + "DATE", + &[ + make_check("'1970-01-01'"), + make_check("'1965-09-25'"), + make_check("'2010-02-09'"), + (Date::PosInfinity, "'infinity'"), + (Date::NegInfinity, "'-infinity'"), + ], + ) + .await; +} + +#[tokio::test] +async fn test_time_params() { + fn make_check(s: &str) -> (Option