Skip to content

Fix / implement GH-15287: add a lazy fetch to Pdo\PgSql #15750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ PHP NEWS

- PDO_PGSQL:
. Added Iterable support for PDO::pgsqlCopyFromArray. (KentarouTakeda)
. Implement GH-15387 Pdo\Pgsql::setAttribute(PDO::ATTR_PREFETCH, 0) or
Pdo\PgSql::prepare(…, [ PDO::ATTR_PREFETCH => 0 ]) make fetch() lazy
instead of storing the whole result set in memory (Guillaume Outters)
/!\ In this mode statements cannot be run parallely

- Random:
. Moves from /dev/urandom usage to arc4random_buf on Haiku. (David Carlier)
Expand Down
16 changes: 16 additions & 0 deletions ext/pdo_pgsql/pgsql_driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ static bool pgsql_handle_preparer(pdo_dbh_t *dbh, zend_string *sql, pdo_stmt_t *
zend_string *nsql = NULL;
int emulate = 0;
int execute_only = 0;
zval *val;
zend_long lval;

S->H = H;
stmt->driver_data = S;
Expand Down Expand Up @@ -304,6 +306,14 @@ static bool pgsql_handle_preparer(pdo_dbh_t *dbh, zend_string *sql, pdo_stmt_t *
stmt->named_rewrite_template = "$%d";
}

S->is_unbuffered =
driver_options
&& (val = zend_hash_index_find(Z_ARRVAL_P(driver_options), PDO_ATTR_PREFETCH))
&& pdo_get_long_param(&lval, val)
? !lval
: H->default_fetching_laziness
;

ret = pdo_parse_params(stmt, sql, &nsql);

if (ret == -1) {
Expand Down Expand Up @@ -1327,6 +1337,12 @@ static bool pdo_pgsql_set_attr(pdo_dbh_t *dbh, zend_long attr, zval *val)
}
H->disable_prepares = bval;
return true;
case PDO_ATTR_PREFETCH:
if (!pdo_get_bool_param(&bval, val)) {
return false;
}
H->default_fetching_laziness = !bval;
return true;
default:
return false;
}
Expand Down
171 changes: 144 additions & 27 deletions ext/pdo_pgsql/pgsql_statement.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,39 +56,79 @@
#define FLOAT8LABEL "float8"
#define FLOAT8OID 701

#define FIN_DISCARD 0x1
#define FIN_CLOSE 0x2
#define FIN_ABORT 0x4


static int pgsql_stmt_dtor(pdo_stmt_t *stmt)

static void pgsql_stmt_finish(pdo_pgsql_stmt *S, int fin_mode)
{
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
pdo_pgsql_db_handle *H = S->H;

if (S->is_running_unbuffered && S->result && (fin_mode & FIN_ABORT)) {
PGcancel *cancel = PQgetCancel(H->server);
char errbuf[256];
PQcancel(cancel, errbuf, 256);
PQfreeCancel(cancel);
S->is_running_unbuffered = false;
}

if (S->result) {
/* free the resource */
PQclear(S->result);
S->result = NULL;
}

if (S->stmt_name) {
if (S->is_prepared && server_obj_usable) {
pdo_pgsql_db_handle *H = S->H;
PGresult *res;
if (S->is_running_unbuffered) {
/* https://postgresql.org/docs/current/libpq-async.html:
* "PQsendQuery cannot be called again until PQgetResult has returned NULL"
* And as all single-row functions are connection-wise instead of statement-wise,
* any new single-row query has to make sure no preceding one is still running.
*/
// @todo Implement !(fin_mode & FIN_DISCARD)
// instead of discarding results we could store them to their statement
// so that their fetch() will get them (albeit not in lazy mode anymore).
while ((S->result = PQgetResult(H->server))) {
PQclear(S->result);
S->result = NULL;
}
S->is_running_unbuffered = false;
}

if (S->stmt_name && S->is_prepared && (fin_mode & FIN_CLOSE)) {
PGresult *res;
#ifndef HAVE_PQCLOSEPREPARED
// TODO (??) libpq does not support close statement protocol < postgres 17
// check if we can circumvent this.
char *q = NULL;
spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name);
res = PQexec(H->server, q);
efree(q);
// TODO (??) libpq does not support close statement protocol < postgres 17
// check if we can circumvent this.
char *q = NULL;
spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name);
res = PQexec(H->server, q);
efree(q);
#else
res = PQclosePrepared(H->server, S->stmt_name);
res = PQclosePrepared(H->server, S->stmt_name);
#endif
if (res) {
PQclear(res);
}
if (res) {
PQclear(res);
}

S->is_prepared = false;
if (H->running_stmt == S) {
H->running_stmt = NULL;
}
}
}

static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
{
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);

pgsql_stmt_finish(S, FIN_DISCARD|(server_obj_usable ? FIN_CLOSE|FIN_ABORT : 0));

if (S->stmt_name) {
efree(S->stmt_name);
S->stmt_name = NULL;
}
Expand Down Expand Up @@ -142,14 +182,20 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
pdo_pgsql_db_handle *H = S->H;
ExecStatusType status;
int dispatch_result = 1;

bool in_trans = stmt->dbh->methods->in_transaction(stmt->dbh);

/* ensure that we free any previous unfetched results */
if(S->result) {
PQclear(S->result);
S->result = NULL;
/* in unbuffered mode, finish any running statement: libpq explicitely prohibits this
* and returns a PGRES_FATAL_ERROR when PQgetResult gets called for stmt 2 if DEALLOCATE
* was called for stmt 1 inbetween
* (maybe it will change with pipeline mode in libpq 14?) */
if (S->is_unbuffered && H->running_stmt) {
pgsql_stmt_finish(H->running_stmt, FIN_CLOSE);
H->running_stmt = NULL;
}
/* ensure that we free any previous unfetched results */
pgsql_stmt_finish(S, 0);

S->current_row = 0;

Expand Down Expand Up @@ -198,6 +244,7 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
/* it worked */
S->is_prepared = 1;
PQclear(S->result);
S->result = NULL;
break;
default: {
char *sqlstate = pdo_pgsql_sqlstate(S->result);
Expand Down Expand Up @@ -227,30 +274,72 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
}
}
}
S->result = PQexecPrepared(H->server, S->stmt_name,
if (S->is_unbuffered) {
dispatch_result = PQsendQueryPrepared(H->server, S->stmt_name,
stmt->bound_params ?
zend_hash_num_elements(stmt->bound_params) :
0,
(const char**)S->param_values,
S->param_lengths,
S->param_formats,
0);
} else {
S->result = PQexecPrepared(H->server, S->stmt_name,
stmt->bound_params ?
zend_hash_num_elements(stmt->bound_params) :
0,
(const char**)S->param_values,
S->param_lengths,
S->param_formats,
0);
}
} else if (stmt->supports_placeholders == PDO_PLACEHOLDER_NAMED) {
/* execute query with parameters */
S->result = PQexecParams(H->server, ZSTR_VAL(S->query),
if (S->is_unbuffered) {
dispatch_result = PQsendQueryParams(H->server, ZSTR_VAL(S->query),
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
S->param_types,
(const char**)S->param_values,
S->param_lengths,
S->param_formats,
0);
} else {
S->result = PQexecParams(H->server, ZSTR_VAL(S->query),
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
S->param_types,
(const char**)S->param_values,
S->param_lengths,
S->param_formats,
0);
}
} else {
/* execute plain query (with embedded parameters) */
S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string));
if (S->is_unbuffered) {
dispatch_result = PQsendQuery(H->server, ZSTR_VAL(stmt->active_query_string));
} else {
S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string));
}
}

H->running_stmt = S;

if (S->is_unbuffered) {
if (!dispatch_result) {
pdo_pgsql_error_stmt(stmt, 0, NULL);
H->running_stmt = NULL;
return 0;
}
S->is_running_unbuffered = true;
(void)PQsetSingleRowMode(H->server);
/* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */

/* try a first fetch to at least have column names and so on */
S->result = PQgetResult(S->H->server);
}

status = PQresultStatus(S->result);

if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
return 0;
}
Expand Down Expand Up @@ -472,6 +561,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt,
return 0;
}
} else {
if (S->is_running_unbuffered && S->current_row >= stmt->row_count) {
ExecStatusType status;

/* @todo in unbuffered mode, PQ allows multiple queries to be passed:
* column_count should be recomputed on each iteration */

if(S->result) {
PQclear(S->result);
S->result = NULL;
}

S->result = PQgetResult(S->H->server);
status = PQresultStatus(S->result);

if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
return 0;
}

stmt->row_count = (zend_long)PQntuples(S->result);
S->current_row = 0;

if (!stmt->row_count) {
S->is_running_unbuffered = false;
/* libpq requires looping until getResult returns null */
pgsql_stmt_finish(S, 0);
}
}
if (S->current_row < stmt->row_count) {
S->current_row++;
return 1;
Expand Down
10 changes: 8 additions & 2 deletions ext/pdo_pgsql/php_pdo_pgsql_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ typedef struct {
char *errmsg;
} pdo_pgsql_error_info;

typedef struct pdo_pgsql_stmt pdo_pgsql_stmt;

/* stuff we use in a pgsql database handle */
typedef struct {
PGconn *server;
Expand All @@ -49,13 +51,15 @@ typedef struct {
bool disable_prepares;
HashTable *lob_streams;
zend_fcall_info_cache *notice_callback;
bool default_fetching_laziness;
pdo_pgsql_stmt *running_stmt;
} pdo_pgsql_db_handle;

typedef struct {
Oid pgsql_type;
} pdo_pgsql_column;

typedef struct {
struct pdo_pgsql_stmt {
pdo_pgsql_db_handle *H;
PGresult *result;
pdo_pgsql_column *cols;
Expand All @@ -68,7 +72,9 @@ typedef struct {
Oid *param_types;
int current_row;
bool is_prepared;
} pdo_pgsql_stmt;
bool is_unbuffered;
bool is_running_unbuffered;
};

typedef struct {
Oid oid;
Expand Down
Loading