Skip to content

Commit 8ad8885

Browse files
committed
pdo_pgsql: unbuffered fetching
fetch part of the lazy fetch mode for Pdo\Pgsql Starts fixing #15287
1 parent bde9238 commit 8ad8885

File tree

2 files changed

+138
-16
lines changed

2 files changed

+138
-16
lines changed

ext/pdo_pgsql/pgsql_statement.c

Lines changed: 130 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,25 +56,48 @@
5656
#define FLOAT8LABEL "float8"
5757
#define FLOAT8OID 701
5858

59+
#define FIN_DISCARD 0x1
60+
#define FIN_CLOSE 0x2
61+
#define FIN_ABORT 0x4
5962

6063

61-
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
64+
65+
static void pgsql_stmt_finish(pdo_pgsql_stmt *S, int fin_mode)
6266
{
63-
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
64-
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
65-
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
66-
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
67+
pdo_pgsql_db_handle *H = S->H;
68+
69+
if (S->is_running_unbuffered && S->result && (fin_mode & FIN_ABORT)) {
70+
PGcancel *cancel = PQgetCancel(H->server);
71+
char errbuf[256];
72+
PQcancel(cancel, errbuf, 256);
73+
PQfreeCancel(cancel);
74+
S->is_running_unbuffered = false;
75+
}
6776

6877
if (S->result) {
6978
/* free the resource */
7079
PQclear(S->result);
7180
S->result = NULL;
7281
}
7382

74-
if (S->stmt_name) {
75-
if (S->is_prepared && server_obj_usable) {
76-
pdo_pgsql_db_handle *H = S->H;
77-
PGresult *res;
83+
if (S->is_running_unbuffered) {
84+
/* https://postgresql.org/docs/current/libpq-async.html:
85+
* "PQsendQuery cannot be called again until PQgetResult has returned NULL"
86+
* And as all single-row functions are connection-wise instead of statement-wise,
87+
* any new single-row query has to make sure no preceding one is still running.
88+
*/
89+
// @todo Implement !(fin_mode & FIN_DISCARD)
90+
// instead of discarding results we could store them to their statement
91+
// so that their fetch() will get them (albeit not in lazy mode anymore).
92+
while ((S->result = PQgetResult(H->server))) {
93+
PQclear(S->result);
94+
S->result = NULL;
95+
}
96+
S->is_running_unbuffered = false;
97+
}
98+
99+
if (S->stmt_name && S->is_prepared && (fin_mode & FIN_CLOSE)) {
100+
PGresult *res;
78101
#ifndef HAVE_PQCLOSEPREPARED
79102
// TODO (??) libpq does not support close statement protocol < postgres 17
80103
// check if we can circumvent this.
@@ -88,7 +111,24 @@ static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
88111
if (res) {
89112
PQclear(res);
90113
}
114+
115+
S->is_prepared = false;
116+
if (H->running_stmt == S) {
117+
H->running_stmt = NULL;
91118
}
119+
}
120+
}
121+
122+
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
123+
{
124+
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
125+
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
126+
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
127+
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
128+
129+
pgsql_stmt_finish(S, FIN_DISCARD|(server_obj_usable ? FIN_CLOSE|FIN_ABORT : 0));
130+
131+
if (S->stmt_name) {
92132
efree(S->stmt_name);
93133
S->stmt_name = NULL;
94134
}
@@ -142,14 +182,20 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
142182
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
143183
pdo_pgsql_db_handle *H = S->H;
144184
ExecStatusType status;
185+
int dispatch_result = 1;
145186

146187
bool in_trans = stmt->dbh->methods->in_transaction(stmt->dbh);
147188

148-
/* ensure that we free any previous unfetched results */
149-
if(S->result) {
150-
PQclear(S->result);
151-
S->result = NULL;
189+
/* in unbuffered mode, finish any running statement: libpq explicitely prohibits this
190+
* and returns a PGRES_FATAL_ERROR when PQgetResult gets called for stmt 2 if DEALLOCATE
191+
* was called for stmt 1 inbetween
192+
* (maybe it will change with pipeline mode in libpq 14?) */
193+
if (S->is_unbuffered && H->running_stmt) {
194+
pgsql_stmt_finish(H->running_stmt, FIN_CLOSE);
195+
H->running_stmt = NULL;
152196
}
197+
/* ensure that we free any previous unfetched results */
198+
pgsql_stmt_finish(S, 0);
153199

154200
S->current_row = 0;
155201

@@ -228,6 +274,16 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
228274
}
229275
}
230276
}
277+
if (S->is_unbuffered) {
278+
dispatch_result = PQsendQueryPrepared(H->server, S->stmt_name,
279+
stmt->bound_params ?
280+
zend_hash_num_elements(stmt->bound_params) :
281+
0,
282+
(const char**)S->param_values,
283+
S->param_lengths,
284+
S->param_formats,
285+
0);
286+
} else {
231287
S->result = PQexecPrepared(H->server, S->stmt_name,
232288
stmt->bound_params ?
233289
zend_hash_num_elements(stmt->bound_params) :
@@ -236,22 +292,54 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
236292
S->param_lengths,
237293
S->param_formats,
238294
0);
295+
}
239296
} else if (stmt->supports_placeholders == PDO_PLACEHOLDER_NAMED) {
240297
/* execute query with parameters */
298+
if (S->is_unbuffered) {
299+
dispatch_result = PQsendQueryParams(H->server, ZSTR_VAL(S->query),
300+
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
301+
S->param_types,
302+
(const char**)S->param_values,
303+
S->param_lengths,
304+
S->param_formats,
305+
0);
306+
} else {
241307
S->result = PQexecParams(H->server, ZSTR_VAL(S->query),
242308
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
243309
S->param_types,
244310
(const char**)S->param_values,
245311
S->param_lengths,
246312
S->param_formats,
247313
0);
314+
}
248315
} else {
249316
/* execute plain query (with embedded parameters) */
317+
if (S->is_unbuffered) {
318+
dispatch_result = PQsendQuery(H->server, ZSTR_VAL(stmt->active_query_string));
319+
} else {
250320
S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string));
321+
}
251322
}
323+
324+
H->running_stmt = S;
325+
326+
if (S->is_unbuffered) {
327+
if (!dispatch_result) {
328+
pdo_pgsql_error_stmt(stmt, 0, NULL);
329+
H->running_stmt = NULL;
330+
return 0;
331+
}
332+
S->is_running_unbuffered = true;
333+
(void)PQsetSingleRowMode(H->server);
334+
/* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */
335+
336+
/* try a first fetch to at least have column names and so on */
337+
S->result = PQgetResult(S->H->server);
338+
}
339+
252340
status = PQresultStatus(S->result);
253341

254-
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
342+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
255343
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
256344
return 0;
257345
}
@@ -473,6 +561,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt,
473561
return 0;
474562
}
475563
} else {
564+
if (S->is_running_unbuffered && S->current_row >= stmt->row_count) {
565+
ExecStatusType status;
566+
567+
/* @todo in unbuffered mode, PQ allows multiple queries to be passed:
568+
* column_count should be recomputed on each iteration */
569+
570+
if(S->result) {
571+
PQclear(S->result);
572+
S->result = NULL;
573+
}
574+
575+
S->result = PQgetResult(S->H->server);
576+
status = PQresultStatus(S->result);
577+
578+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
579+
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
580+
return 0;
581+
}
582+
583+
stmt->row_count = (zend_long)PQntuples(S->result);
584+
S->current_row = 0;
585+
586+
if (!stmt->row_count) {
587+
S->is_running_unbuffered = false;
588+
/* libpq requires looping until getResult returns null */
589+
pgsql_stmt_finish(S, 0);
590+
}
591+
}
476592
if (S->current_row < stmt->row_count) {
477593
S->current_row++;
478594
return 1;

ext/pdo_pgsql/php_pdo_pgsql_int.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ typedef struct {
3434
char *errmsg;
3535
} pdo_pgsql_error_info;
3636

37+
typedef struct pdo_pgsql_stmt pdo_pgsql_stmt;
38+
3739
/* stuff we use in a pgsql database handle */
3840
typedef struct {
3941
PGconn *server;
@@ -49,13 +51,15 @@ typedef struct {
4951
bool disable_prepares;
5052
HashTable *lob_streams;
5153
zend_fcall_info_cache *notice_callback;
54+
bool default_fetching_laziness;
55+
pdo_pgsql_stmt *running_stmt;
5256
} pdo_pgsql_db_handle;
5357

5458
typedef struct {
5559
Oid pgsql_type;
5660
} pdo_pgsql_column;
5761

58-
typedef struct {
62+
struct pdo_pgsql_stmt {
5963
pdo_pgsql_db_handle *H;
6064
PGresult *result;
6165
pdo_pgsql_column *cols;
@@ -68,7 +72,9 @@ typedef struct {
6872
Oid *param_types;
6973
int current_row;
7074
bool is_prepared;
71-
} pdo_pgsql_stmt;
75+
bool is_unbuffered;
76+
bool is_running_unbuffered;
77+
};
7278

7379
typedef struct {
7480
Oid oid;

0 commit comments

Comments
 (0)