Skip to content

Commit c07fe3a

Browse files
committed
pdo_pgsql: unbuffered fetching
fetch part of the lazy fetch mode for Pdo\Pgsql Starts fixing #15287
1 parent f5a1cb8 commit c07fe3a

File tree

2 files changed

+146
-26
lines changed

2 files changed

+146
-26
lines changed

ext/pdo_pgsql/pgsql_statement.c

Lines changed: 138 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -56,39 +56,77 @@
5656
#define FLOAT8LABEL "float8"
5757
#define FLOAT8OID 701
5858

59+
#define FIN_DISCARD 0x1
60+
#define FIN_CLOSE 0x2
61+
#define FIN_ABORT 0x4
5962

6063

61-
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
64+
65+
static void pgsql_stmt_finish(pdo_pgsql_stmt *S, int fin_mode)
6266
{
63-
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
64-
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
65-
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
66-
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
67+
if (S->is_running_unbuffered && S->result && (fin_mode & FIN_ABORT)) {
68+
PGcancel *cancel = PQgetCancel(S->H->server);
69+
char errbuf[256];
70+
PQcancel(cancel, errbuf, 256);
71+
PQfreeCancel(cancel);
72+
S->is_running_unbuffered = false;
73+
}
6774

6875
if (S->result) {
6976
/* free the resource */
7077
PQclear(S->result);
7178
S->result = NULL;
7279
}
7380

74-
if (S->stmt_name) {
75-
if (S->is_prepared && server_obj_usable) {
76-
pdo_pgsql_db_handle *H = S->H;
77-
PGresult *res;
81+
if (S->is_running_unbuffered) {
82+
/* https://postgresql.org/docs/current/libpq-async.html:
83+
* "PQsendQuery cannot be called again until PQgetResult has returned NULL"
84+
* And as all single-row functions are connection-wise instead of statement-wise,
85+
* any new single-row query has to make sure no preceding one is still running.
86+
*/
87+
// @todo Implement !(fin_mode & FIN_DISCARD)
88+
// instead of discarding results we could store them to their statement
89+
// so that their fetch() will get them (albeit not in lazy mode anymore).
90+
while ((S->result = PQgetResult(S->H->server))) {
91+
PQclear(S->result);
92+
S->result = NULL;
93+
}
94+
S->is_running_unbuffered = false;
95+
}
96+
97+
if (S->stmt_name && S->is_prepared && (fin_mode & FIN_CLOSE)) {
98+
PGresult *res;
7899
#ifndef HAVE_PQCLOSEPREPARED
79-
// TODO (??) libpq does not support close statement protocol < postgres 17
80-
// check if we can circumvent this.
81-
char *q = NULL;
82-
spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name);
83-
res = PQexec(H->server, q);
84-
efree(q);
100+
// TODO (??) libpq does not support close statement protocol < postgres 17
101+
// check if we can circumvent this.
102+
char *q = NULL;
103+
spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name);
104+
res = PQexec(S->H->server, q);
105+
efree(q);
85106
#else
86-
res = PQclosePrepared(H->server, S->stmt_name);
107+
res = PQclosePrepared(H->server, S->stmt_name);
87108
#endif
88-
if (res) {
89-
PQclear(res);
90-
}
109+
if (res) {
110+
PQclear(res);
111+
}
112+
113+
S->is_prepared = false;
114+
if (S->H->running_stmt == S) {
115+
S->H->running_stmt = NULL;
91116
}
117+
}
118+
}
119+
120+
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
121+
{
122+
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
123+
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
124+
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
125+
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
126+
127+
pgsql_stmt_finish(S, FIN_DISCARD|(server_obj_usable ? FIN_CLOSE|FIN_ABORT : 0));
128+
129+
if (S->stmt_name) {
92130
efree(S->stmt_name);
93131
S->stmt_name = NULL;
94132
}
@@ -142,14 +180,20 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
142180
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
143181
pdo_pgsql_db_handle *H = S->H;
144182
ExecStatusType status;
183+
int dispatch_result = 1;
145184

146185
bool in_trans = stmt->dbh->methods->in_transaction(stmt->dbh);
147186

148-
/* ensure that we free any previous unfetched results */
149-
if(S->result) {
150-
PQclear(S->result);
151-
S->result = NULL;
187+
/* in unbuffered mode, finish any running statement: libpq explicitely prohibits this
188+
* and returns a PGRES_FATAL_ERROR when PQgetResult gets called for stmt 2 if DEALLOCATE
189+
* was called for stmt 1 inbetween
190+
* (maybe it will change with pipeline mode in libpq 14?) */
191+
if (S->is_unbuffered && H->running_stmt) {
192+
pgsql_stmt_finish(H->running_stmt, FIN_CLOSE);
193+
H->running_stmt = NULL;
152194
}
195+
/* ensure that we free any previous unfetched results */
196+
pgsql_stmt_finish(S, 0);
153197

154198
S->current_row = 0;
155199

@@ -228,6 +272,16 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
228272
}
229273
}
230274
}
275+
if (S->is_unbuffered) {
276+
dispatch_result = PQsendQueryPrepared(H->server, S->stmt_name,
277+
stmt->bound_params ?
278+
zend_hash_num_elements(stmt->bound_params) :
279+
0,
280+
(const char**)S->param_values,
281+
S->param_lengths,
282+
S->param_formats,
283+
0);
284+
} else {
231285
S->result = PQexecPrepared(H->server, S->stmt_name,
232286
stmt->bound_params ?
233287
zend_hash_num_elements(stmt->bound_params) :
@@ -236,22 +290,54 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
236290
S->param_lengths,
237291
S->param_formats,
238292
0);
293+
}
239294
} else if (stmt->supports_placeholders == PDO_PLACEHOLDER_NAMED) {
240295
/* execute query with parameters */
296+
if (S->is_unbuffered) {
297+
dispatch_result = PQsendQueryParams(H->server, ZSTR_VAL(S->query),
298+
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
299+
S->param_types,
300+
(const char**)S->param_values,
301+
S->param_lengths,
302+
S->param_formats,
303+
0);
304+
} else {
241305
S->result = PQexecParams(H->server, ZSTR_VAL(S->query),
242306
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
243307
S->param_types,
244308
(const char**)S->param_values,
245309
S->param_lengths,
246310
S->param_formats,
247311
0);
312+
}
248313
} else {
249314
/* execute plain query (with embedded parameters) */
315+
if (S->is_unbuffered) {
316+
dispatch_result = PQsendQuery(H->server, ZSTR_VAL(stmt->active_query_string));
317+
} else {
250318
S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string));
319+
}
320+
}
321+
322+
H->running_stmt = S;
323+
324+
if (S->is_unbuffered) {
325+
if (!dispatch_result) {
326+
pdo_pgsql_error_stmt(stmt, 0, NULL);
327+
H->running_stmt = NULL;
328+
return 0;
329+
}
330+
S->is_running_unbuffered = true;
331+
(void)PQsetSingleRowMode(H->server);
332+
/* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */
333+
334+
/* try a first fetch to at least have column names and so on */
335+
S->result = PQgetResult(S->H->server);
251336
}
337+
252338
status = PQresultStatus(S->result);
253339

254-
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
340+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
255341
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
256342
return 0;
257343
}
@@ -473,6 +559,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt,
473559
return 0;
474560
}
475561
} else {
562+
if (S->is_running_unbuffered && S->current_row >= stmt->row_count) {
563+
ExecStatusType status;
564+
565+
/* @todo in unbuffered mode, PQ allows multiple queries to be passed:
566+
* column_count should be recomputed on each iteration */
567+
568+
if(S->result) {
569+
PQclear(S->result);
570+
S->result = NULL;
571+
}
572+
573+
S->result = PQgetResult(S->H->server);
574+
status = PQresultStatus(S->result);
575+
576+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
577+
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
578+
return 0;
579+
}
580+
581+
stmt->row_count = (zend_long)PQntuples(S->result);
582+
S->current_row = 0;
583+
584+
if (!stmt->row_count) {
585+
S->is_running_unbuffered = false;
586+
/* libpq requires looping until getResult returns null */
587+
pgsql_stmt_finish(S, 0);
588+
}
589+
}
476590
if (S->current_row < stmt->row_count) {
477591
S->current_row++;
478592
return 1;

ext/pdo_pgsql/php_pdo_pgsql_int.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ typedef struct {
3434
char *errmsg;
3535
} pdo_pgsql_error_info;
3636

37+
typedef struct pdo_pgsql_stmt pdo_pgsql_stmt;
38+
3739
/* stuff we use in a pgsql database handle */
3840
typedef struct {
3941
PGconn *server;
@@ -49,13 +51,15 @@ typedef struct {
4951
bool disable_prepares;
5052
HashTable *lob_streams;
5153
zend_fcall_info_cache *notice_callback;
54+
bool default_fetching_laziness;
55+
pdo_pgsql_stmt *running_stmt;
5256
} pdo_pgsql_db_handle;
5357

5458
typedef struct {
5559
Oid pgsql_type;
5660
} pdo_pgsql_column;
5761

58-
typedef struct {
62+
struct pdo_pgsql_stmt {
5963
pdo_pgsql_db_handle *H;
6064
PGresult *result;
6165
pdo_pgsql_column *cols;
@@ -68,7 +72,9 @@ typedef struct {
6872
Oid *param_types;
6973
int current_row;
7074
bool is_prepared;
71-
} pdo_pgsql_stmt;
75+
bool is_unbuffered;
76+
bool is_running_unbuffered;
77+
};
7278

7379
typedef struct {
7480
Oid oid;

0 commit comments

Comments
 (0)