Skip to content

Commit 5a2c611

Browse files
committed
pdo_pgsql: unbuffered fetching
fetch part of the lazy fetch mode for Pdo\Pgsql Starts fixing #15287
1 parent f2ad62b commit 5a2c611

File tree

2 files changed

+136
-24
lines changed

2 files changed

+136
-24
lines changed

ext/pdo_pgsql/pgsql_statement.c

Lines changed: 128 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,34 +56,68 @@
5656
#define FLOAT8LABEL "float8"
5757
#define FLOAT8OID 701
5858

59+
#define FIN_DISCARD 0x1
60+
#define FIN_CLOSE 0x2
61+
#define FIN_ABORT 0x4
5962

6063

61-
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
64+
65+
static void pgsql_stmt_finish(pdo_pgsql_stmt *S, int fin_mode)
6266
{
63-
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
64-
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
65-
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
66-
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
67+
if (S->is_running_unbuffered && S->result && (fin_mode & FIN_ABORT)) {
68+
PGcancel *cancel = PQgetCancel(S->H->server);
69+
char errbuf[256];
70+
PQcancel(cancel, errbuf, 256);
71+
PQfreeCancel(cancel);
72+
S->is_running_unbuffered = false;
73+
}
6774

6875
if (S->result) {
69-
/* free the resource */
7076
PQclear(S->result);
7177
S->result = NULL;
7278
}
7379

74-
if (S->stmt_name) {
75-
if (S->is_prepared && server_obj_usable) {
76-
pdo_pgsql_db_handle *H = S->H;
77-
char *q = NULL;
78-
PGresult *res;
80+
if (S->is_running_unbuffered) {
81+
/* https://postgresql.org/docs/current/libpq-async.html:
82+
* "PQsendQuery cannot be called again until PQgetResult has returned NULL"
83+
* And as all single-row functions are connection-wise instead of statement-wise,
84+
* any new single-row query has to make sure no preceding one is still running.
85+
*/
86+
// @todo Implement !(fin_mode & FIN_DISCARD)
87+
// instead of discarding results we could store them to their statement
88+
// so that their fetch() will get them (albeit not in lazy mode anymore).
89+
while ((S->result = PQgetResult(S->H->server))) {
90+
PQclear(S->result);
91+
S->result = NULL;
92+
}
93+
S->is_running_unbuffered = false;
94+
}
7995

80-
spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name);
81-
res = PQexec(H->server, q);
82-
efree(q);
83-
if (res) {
84-
PQclear(res);
85-
}
96+
if (S->stmt_name && S->is_prepared && (fin_mode & FIN_CLOSE)) {
97+
char *q = NULL;
98+
PGresult *res;
99+
100+
spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name);
101+
res = PQexec(S->H->server, q);
102+
efree(q);
103+
if (res) {
104+
PQclear(res);
86105
}
106+
107+
S->is_prepared = false;
108+
}
109+
}
110+
111+
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
112+
{
113+
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
114+
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
115+
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
116+
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
117+
118+
pgsql_stmt_finish(S, FIN_DISCARD|(server_obj_usable ? FIN_CLOSE|FIN_ABORT : 0));
119+
120+
if (S->stmt_name) {
87121
efree(S->stmt_name);
88122
S->stmt_name = NULL;
89123
}
@@ -137,14 +171,18 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
137171
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
138172
pdo_pgsql_db_handle *H = S->H;
139173
ExecStatusType status;
174+
int dispatch_result = 1;
140175

141176
bool in_trans = stmt->dbh->methods->in_transaction(stmt->dbh);
142177

143-
/* ensure that we free any previous unfetched results */
144-
if(S->result) {
145-
PQclear(S->result);
146-
S->result = NULL;
178+
/* in unbuffered mode, finish any running statement: libpq explicitely prohibits this
179+
* (maybe it will change with pipeline mode in libpq 14?) */
180+
if (S->is_unbuffered && H->running_stmt) {
181+
pgsql_stmt_finish(H->running_stmt, FIN_CLOSE);
182+
H->running_stmt = NULL;
147183
}
184+
/* ensure that we free any previous unfetched results */
185+
pgsql_stmt_finish(S, 0);
148186

149187
S->current_row = 0;
150188

@@ -219,6 +257,16 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
219257
}
220258
}
221259
}
260+
if (S->is_unbuffered) {
261+
dispatch_result = PQsendQueryPrepared(H->server, S->stmt_name,
262+
stmt->bound_params ?
263+
zend_hash_num_elements(stmt->bound_params) :
264+
0,
265+
(const char**)S->param_values,
266+
S->param_lengths,
267+
S->param_formats,
268+
0);
269+
} else {
222270
S->result = PQexecPrepared(H->server, S->stmt_name,
223271
stmt->bound_params ?
224272
zend_hash_num_elements(stmt->bound_params) :
@@ -227,22 +275,52 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
227275
S->param_lengths,
228276
S->param_formats,
229277
0);
278+
}
230279
} else if (stmt->supports_placeholders == PDO_PLACEHOLDER_NAMED) {
231280
/* execute query with parameters */
281+
if (S->is_unbuffered) {
282+
dispatch_result = PQsendQueryParams(H->server, ZSTR_VAL(S->query),
283+
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
284+
S->param_types,
285+
(const char**)S->param_values,
286+
S->param_lengths,
287+
S->param_formats,
288+
0);
289+
} else {
232290
S->result = PQexecParams(H->server, ZSTR_VAL(S->query),
233291
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
234292
S->param_types,
235293
(const char**)S->param_values,
236294
S->param_lengths,
237295
S->param_formats,
238296
0);
297+
}
239298
} else {
240299
/* execute plain query (with embedded parameters) */
300+
if (S->is_unbuffered) {
301+
dispatch_result = PQsendQuery(H->server, ZSTR_VAL(stmt->active_query_string));
302+
} else {
241303
S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string));
304+
}
242305
}
306+
307+
if (S->is_unbuffered) {
308+
if (!dispatch_result) {
309+
pdo_pgsql_error_stmt(stmt, 0, NULL);
310+
return 0;
311+
}
312+
S->is_running_unbuffered = true;
313+
H->running_stmt = S;
314+
PQsetSingleRowMode(H->server);
315+
/* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */
316+
317+
/* try a first fetch to at least have column names and so on */
318+
S->result = PQgetResult(S->H->server);
319+
}
320+
243321
status = PQresultStatus(S->result);
244322

245-
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
323+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
246324
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
247325
return 0;
248326
}
@@ -464,6 +542,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt,
464542
return 0;
465543
}
466544
} else {
545+
if (S->is_running_unbuffered && S->current_row >= stmt->row_count) {
546+
ExecStatusType status;
547+
548+
/* @todo in unbuffered mode, PQ allows multiple queries to be passed:
549+
* column_count should be recomputed on each iteration */
550+
551+
if(S->result) {
552+
PQclear(S->result);
553+
S->result = NULL;
554+
}
555+
556+
S->result = PQgetResult(S->H->server);
557+
status = PQresultStatus(S->result);
558+
559+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
560+
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
561+
return 0;
562+
}
563+
564+
stmt->row_count = (zend_long)PQntuples(S->result);
565+
S->current_row = 0;
566+
567+
if (!stmt->row_count) {
568+
S->is_running_unbuffered = false;
569+
/* libpq requires looping until getResult returns null */
570+
pgsql_stmt_finish(S, 0);
571+
}
572+
}
467573
if (S->current_row < stmt->row_count) {
468574
S->current_row++;
469575
return 1;

ext/pdo_pgsql/php_pdo_pgsql_int.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ typedef struct {
3232
char *errmsg;
3333
} pdo_pgsql_error_info;
3434

35+
typedef struct pdo_pgsql_stmt pdo_pgsql_stmt;
36+
3537
/* stuff we use in a pgsql database handle */
3638
typedef struct {
3739
PGconn *server;
@@ -47,13 +49,15 @@ typedef struct {
4749
bool disable_prepares;
4850
HashTable *lob_streams;
4951
zend_fcall_info_cache *notice_callback;
52+
bool default_fetching_laziness;
53+
pdo_pgsql_stmt *running_stmt;
5054
} pdo_pgsql_db_handle;
5155

5256
typedef struct {
5357
Oid pgsql_type;
5458
} pdo_pgsql_column;
5559

56-
typedef struct {
60+
struct pdo_pgsql_stmt {
5761
pdo_pgsql_db_handle *H;
5862
PGresult *result;
5963
pdo_pgsql_column *cols;
@@ -66,7 +70,9 @@ typedef struct {
6670
Oid *param_types;
6771
int current_row;
6872
bool is_prepared;
69-
} pdo_pgsql_stmt;
73+
bool is_unbuffered;
74+
bool is_running_unbuffered;
75+
};
7076

7177
typedef struct {
7278
Oid oid;

0 commit comments

Comments
 (0)