diff --git a/NEWS b/NEWS index c46578d871c51..f68d0555f4152 100644 --- a/NEWS +++ b/NEWS @@ -4,6 +4,10 @@ PHP NEWS - PDO_PGSQL: . Added Iterable support for PDO::pgsqlCopyFromArray. (KentarouTakeda) + . Implement GH-15387 Pdo\Pgsql::setAttribute(PDO::ATTR_PREFETCH, 0) or + Pdo\PgSql::prepare(…, [ PDO::ATTR_PREFETCH => 0 ]) make fetch() lazy + instead of storing the whole result set in memory (Guillaume Outters) + /!\ In this mode statements cannot be run parallely - Random: . Moves from /dev/urandom usage to arc4random_buf on Haiku. (David Carlier) diff --git a/ext/pdo_pgsql/pgsql_driver.c b/ext/pdo_pgsql/pgsql_driver.c index 11fa58b4a7b0e..9ce4c6165bd8b 100644 --- a/ext/pdo_pgsql/pgsql_driver.c +++ b/ext/pdo_pgsql/pgsql_driver.c @@ -271,6 +271,8 @@ static bool pgsql_handle_preparer(pdo_dbh_t *dbh, zend_string *sql, pdo_stmt_t * zend_string *nsql = NULL; int emulate = 0; int execute_only = 0; + zval *val; + zend_long lval; S->H = H; stmt->driver_data = S; @@ -304,6 +306,14 @@ static bool pgsql_handle_preparer(pdo_dbh_t *dbh, zend_string *sql, pdo_stmt_t * stmt->named_rewrite_template = "$%d"; } + S->is_unbuffered = + driver_options + && (val = zend_hash_index_find(Z_ARRVAL_P(driver_options), PDO_ATTR_PREFETCH)) + && pdo_get_long_param(&lval, val) + ? !lval + : H->default_fetching_laziness + ; + ret = pdo_parse_params(stmt, sql, &nsql); if (ret == -1) { @@ -1327,6 +1337,12 @@ static bool pdo_pgsql_set_attr(pdo_dbh_t *dbh, zend_long attr, zval *val) } H->disable_prepares = bval; return true; + case PDO_ATTR_PREFETCH: + if (!pdo_get_bool_param(&bval, val)) { + return false; + } + H->default_fetching_laziness = !bval; + return true; default: return false; } diff --git a/ext/pdo_pgsql/pgsql_statement.c b/ext/pdo_pgsql/pgsql_statement.c index 8f3dd5237b5a1..169fa49af4e14 100644 --- a/ext/pdo_pgsql/pgsql_statement.c +++ b/ext/pdo_pgsql/pgsql_statement.c @@ -56,14 +56,23 @@ #define FLOAT8LABEL "float8" #define FLOAT8OID 701 +#define FIN_DISCARD 0x1 +#define FIN_CLOSE 0x2 +#define FIN_ABORT 0x4 -static int pgsql_stmt_dtor(pdo_stmt_t *stmt) + +static void pgsql_stmt_finish(pdo_pgsql_stmt *S, int fin_mode) { - pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data; - bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle) - && IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)]) - && !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED); + pdo_pgsql_db_handle *H = S->H; + + if (S->is_running_unbuffered && S->result && (fin_mode & FIN_ABORT)) { + PGcancel *cancel = PQgetCancel(H->server); + char errbuf[256]; + PQcancel(cancel, errbuf, 256); + PQfreeCancel(cancel); + S->is_running_unbuffered = false; + } if (S->result) { /* free the resource */ @@ -71,24 +80,55 @@ static int pgsql_stmt_dtor(pdo_stmt_t *stmt) S->result = NULL; } - if (S->stmt_name) { - if (S->is_prepared && server_obj_usable) { - pdo_pgsql_db_handle *H = S->H; - PGresult *res; + if (S->is_running_unbuffered) { + /* https://postgresql.org/docs/current/libpq-async.html: + * "PQsendQuery cannot be called again until PQgetResult has returned NULL" + * And as all single-row functions are connection-wise instead of statement-wise, + * any new single-row query has to make sure no preceding one is still running. + */ + // @todo Implement !(fin_mode & FIN_DISCARD) + // instead of discarding results we could store them to their statement + // so that their fetch() will get them (albeit not in lazy mode anymore). + while ((S->result = PQgetResult(H->server))) { + PQclear(S->result); + S->result = NULL; + } + S->is_running_unbuffered = false; + } + + if (S->stmt_name && S->is_prepared && (fin_mode & FIN_CLOSE)) { + PGresult *res; #ifndef HAVE_PQCLOSEPREPARED - // TODO (??) libpq does not support close statement protocol < postgres 17 - // check if we can circumvent this. - char *q = NULL; - spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name); - res = PQexec(H->server, q); - efree(q); + // TODO (??) libpq does not support close statement protocol < postgres 17 + // check if we can circumvent this. + char *q = NULL; + spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name); + res = PQexec(H->server, q); + efree(q); #else - res = PQclosePrepared(H->server, S->stmt_name); + res = PQclosePrepared(H->server, S->stmt_name); #endif - if (res) { - PQclear(res); - } + if (res) { + PQclear(res); + } + + S->is_prepared = false; + if (H->running_stmt == S) { + H->running_stmt = NULL; } + } +} + +static int pgsql_stmt_dtor(pdo_stmt_t *stmt) +{ + pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data; + bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle) + && IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)]) + && !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED); + + pgsql_stmt_finish(S, FIN_DISCARD|(server_obj_usable ? FIN_CLOSE|FIN_ABORT : 0)); + + if (S->stmt_name) { efree(S->stmt_name); S->stmt_name = NULL; } @@ -142,14 +182,20 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt) pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data; pdo_pgsql_db_handle *H = S->H; ExecStatusType status; + int dispatch_result = 1; bool in_trans = stmt->dbh->methods->in_transaction(stmt->dbh); - /* ensure that we free any previous unfetched results */ - if(S->result) { - PQclear(S->result); - S->result = NULL; + /* in unbuffered mode, finish any running statement: libpq explicitely prohibits this + * and returns a PGRES_FATAL_ERROR when PQgetResult gets called for stmt 2 if DEALLOCATE + * was called for stmt 1 inbetween + * (maybe it will change with pipeline mode in libpq 14?) */ + if (S->is_unbuffered && H->running_stmt) { + pgsql_stmt_finish(H->running_stmt, FIN_CLOSE); + H->running_stmt = NULL; } + /* ensure that we free any previous unfetched results */ + pgsql_stmt_finish(S, 0); S->current_row = 0; @@ -198,6 +244,7 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt) /* it worked */ S->is_prepared = 1; PQclear(S->result); + S->result = NULL; break; default: { char *sqlstate = pdo_pgsql_sqlstate(S->result); @@ -227,7 +274,17 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt) } } } - S->result = PQexecPrepared(H->server, S->stmt_name, + if (S->is_unbuffered) { + dispatch_result = PQsendQueryPrepared(H->server, S->stmt_name, + stmt->bound_params ? + zend_hash_num_elements(stmt->bound_params) : + 0, + (const char**)S->param_values, + S->param_lengths, + S->param_formats, + 0); + } else { + S->result = PQexecPrepared(H->server, S->stmt_name, stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0, @@ -235,22 +292,54 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt) S->param_lengths, S->param_formats, 0); + } } else if (stmt->supports_placeholders == PDO_PLACEHOLDER_NAMED) { /* execute query with parameters */ - S->result = PQexecParams(H->server, ZSTR_VAL(S->query), + if (S->is_unbuffered) { + dispatch_result = PQsendQueryParams(H->server, ZSTR_VAL(S->query), + stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0, + S->param_types, + (const char**)S->param_values, + S->param_lengths, + S->param_formats, + 0); + } else { + S->result = PQexecParams(H->server, ZSTR_VAL(S->query), stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0, S->param_types, (const char**)S->param_values, S->param_lengths, S->param_formats, 0); + } } else { /* execute plain query (with embedded parameters) */ - S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string)); + if (S->is_unbuffered) { + dispatch_result = PQsendQuery(H->server, ZSTR_VAL(stmt->active_query_string)); + } else { + S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string)); + } } + + H->running_stmt = S; + + if (S->is_unbuffered) { + if (!dispatch_result) { + pdo_pgsql_error_stmt(stmt, 0, NULL); + H->running_stmt = NULL; + return 0; + } + S->is_running_unbuffered = true; + (void)PQsetSingleRowMode(H->server); + /* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */ + + /* try a first fetch to at least have column names and so on */ + S->result = PQgetResult(S->H->server); + } + status = PQresultStatus(S->result); - if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) { + if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) { pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result)); return 0; } @@ -472,6 +561,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt, return 0; } } else { + if (S->is_running_unbuffered && S->current_row >= stmt->row_count) { + ExecStatusType status; + + /* @todo in unbuffered mode, PQ allows multiple queries to be passed: + * column_count should be recomputed on each iteration */ + + if(S->result) { + PQclear(S->result); + S->result = NULL; + } + + S->result = PQgetResult(S->H->server); + status = PQresultStatus(S->result); + + if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) { + pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result)); + return 0; + } + + stmt->row_count = (zend_long)PQntuples(S->result); + S->current_row = 0; + + if (!stmt->row_count) { + S->is_running_unbuffered = false; + /* libpq requires looping until getResult returns null */ + pgsql_stmt_finish(S, 0); + } + } if (S->current_row < stmt->row_count) { S->current_row++; return 1; diff --git a/ext/pdo_pgsql/php_pdo_pgsql_int.h b/ext/pdo_pgsql/php_pdo_pgsql_int.h index fc9f1664cc3d4..da77d01c61ec7 100644 --- a/ext/pdo_pgsql/php_pdo_pgsql_int.h +++ b/ext/pdo_pgsql/php_pdo_pgsql_int.h @@ -34,6 +34,8 @@ typedef struct { char *errmsg; } pdo_pgsql_error_info; +typedef struct pdo_pgsql_stmt pdo_pgsql_stmt; + /* stuff we use in a pgsql database handle */ typedef struct { PGconn *server; @@ -49,13 +51,15 @@ typedef struct { bool disable_prepares; HashTable *lob_streams; zend_fcall_info_cache *notice_callback; + bool default_fetching_laziness; + pdo_pgsql_stmt *running_stmt; } pdo_pgsql_db_handle; typedef struct { Oid pgsql_type; } pdo_pgsql_column; -typedef struct { +struct pdo_pgsql_stmt { pdo_pgsql_db_handle *H; PGresult *result; pdo_pgsql_column *cols; @@ -68,7 +72,9 @@ typedef struct { Oid *param_types; int current_row; bool is_prepared; -} pdo_pgsql_stmt; + bool is_unbuffered; + bool is_running_unbuffered; +}; typedef struct { Oid oid; diff --git a/ext/pdo_pgsql/tests/gh15287.phpt b/ext/pdo_pgsql/tests/gh15287.phpt new file mode 100644 index 0000000000000..72bcc44b363b4 --- /dev/null +++ b/ext/pdo_pgsql/tests/gh15287.phpt @@ -0,0 +1,183 @@ +--TEST-- +PDO PgSQL #15287 (Pdo\Pgsql has no real lazy fetch mode) +--EXTENSIONS-- +pdo +pdo_pgsql +--SKIPIF-- + +--FILE-- +setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC); + +// We need a dataset of several KB so that memory gain is significant. +// See https://www.postgresql.org/message-id/1140652.1687950987%40sss.pgh.pa.us +$pdo->exec("create temp table t (n int, t text)"); +$pdo->exec("insert into t values (0, 'original')"); +for ($i = -1; ++$i < 8;) { + $pdo->exec("insert into t select n + 1, 'non '||t from t"); +} + +$reqOf3 = 'select 79 n union all select 80 union all select 81'; +$reqOfBig = 'select * from t'; + +function display($res) +{ + echo implode("\n", array_map(fn($row) => implode("\t", $row), $res))."\n"; +} + +echo "=== non regression ===\n"; + +// libpq explicitely requires single-row-mode statements to run one at a time (one stmt must +// be fully read, or aborted, before another one can be launched). +// Ensure that integration does not break the ability of the traditional, prefetched mode, +// to mix fetching of multiple statements' result. +$stmt1 = $pdo->query($reqOf3); +$stmt2 = $pdo->query("select * from ($reqOf3) t order by n desc"); +for ($i = -1; ++$i < 3;) { + display([ $stmt1->fetch() ]); + display([ $stmt2->fetch() ]); +} + +echo "=== mem test ===\n"; + +// First execute without lazy fetching, as a reference and non-regression; +// execute twice: in case warmup reduces memory consumption, we want the stabilized consumption. +for ($i = -1; ++$i < 5;) { + $attrs = []; + $lazy = false; + switch ($i) { + case 0: + case 3: + echo "Without lazy fetching:\n"; + break; + case 2: + echo "With statement-scoped lazy fetching:\n"; + $attrs = [ PDO::ATTR_PREFETCH => 0 ]; + $lazy = true; + break; + case 4: + echo "With connection-scoped lazy fetching:\n"; + $pdo->setAttribute(PDO::ATTR_PREFETCH, 0); + $lazy = true; + break; + } + $stmt = $pdo->prepare($reqOfBig, $attrs); + $stmt->execute(); + $res = []; + // No fetchAll because we want the memory of the result of the FORElast call (the last one is empty). + while (($re = $stmt->fetch())) { + $res[] = $re; + // Memory introspection relies on an optionally-compiled constant. + if (defined('PDO::PGSQL_ATTR_RESULT_MEMORY_SIZE')) { + $mem = $stmt->getAttribute(PDO::PGSQL_ATTR_RESULT_MEMORY_SIZE); + } else { + // If not there emulate a return value which validates our test. + $mem = $lazy ? 0 : 1; + } + } + echo "ResultSet is $mem bytes long\n"; + if ($i >= 2) { + echo "ResultSet is " . ($mem > $mem0 ? "longer" : ($mem == $mem0 ? "not shorter" : ($mem <= $mem0 / 2 ? "more than twice shorter" : "a bit shorter"))) . " than without lazy fetching\n"; + } else { + $mem0 = $mem; + } +} + +$pdo->setAttribute(PDO::ATTR_PREFETCH, 0); + +foreach ([ + [ 'query', 'fetch' ], + [ 'query', 'fetchAll' ], + [ 'prepare', 'fetch' ], + [ 'prepare', 'fetchAll' ], +] as $mode) { + echo "=== with " . implode(' / ', $mode). " ===\n"; + switch ($mode[0]) { + case 'query': + $stmt = $pdo->query($reqOf3); + break; + case 'prepare': + $stmt = $pdo->prepare($reqOf3); + $stmt->execute(); + break; + } + switch ($mode[1]) { + case 'fetch': + $res = []; + while (($re = $stmt->fetch())) { + $res[] = $re; + } + break; + case 'fetchAll': + $res = $stmt->fetchAll(); + break; + } + display($res); +} +echo "DML works too:\n"; +$pdo->exec("create temp table t2 as select 678 n, 'ok' status"); +echo "multiple calls to the same prepared statement, some interrupted before having read all results:\n"; +$stmt = $pdo->prepare("select :1 n union all select :1 + 1 union all select :1 + 2 union all select :1 + 3"); +$stmt->execute([ 32 ]); +$res = []; for ($i = -1; ++$i < 2;) $res[] = $stmt->fetch(); display($res); +$stmt->execute([ 15 ]); +$res = []; while (($re = $stmt->fetch())) $res[] = $re; display($res); +$stmt->execute([ 0 ]); +$res = []; for ($i = -1; ++$i < 2;) $res[] = $stmt->fetch(); display($res); +display($pdo->query("select * from t2")->fetchAll()); +?> +--EXPECTF-- +=== non regression === +79 +81 +80 +80 +81 +79 +=== mem test === +Without lazy fetching: +ResultSet is %d bytes long +ResultSet is %d bytes long +With statement-scoped lazy fetching: +ResultSet is %d bytes long +ResultSet is more than twice shorter than without lazy fetching +Without lazy fetching: +ResultSet is %d bytes long +ResultSet is not shorter than without lazy fetching +With connection-scoped lazy fetching: +ResultSet is %d bytes long +ResultSet is more than twice shorter than without lazy fetching +=== with query / fetch === +79 +80 +81 +=== with query / fetchAll === +79 +80 +81 +=== with prepare / fetch === +79 +80 +81 +=== with prepare / fetchAll === +79 +80 +81 +DML works too: +multiple calls to the same prepared statement, some interrupted before having read all results: +32 +33 +15 +16 +17 +18 +0 +1 +678 ok