Skip to content

Commit 65df80a

Browse files
committed
pdo_pgsql: unbuffered fetching
fetch part of the lazy fetch mode for Pdo\Pgsql Starts fixing #15287
1 parent f2ad62b commit 65df80a

File tree

2 files changed

+143
-24
lines changed

2 files changed

+143
-24
lines changed

ext/pdo_pgsql/pgsql_statement.c

Lines changed: 135 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,34 +56,71 @@
5656
#define FLOAT8LABEL "float8"
5757
#define FLOAT8OID 701
5858

59+
#define FIN_DISCARD 0x1
60+
#define FIN_CLOSE 0x2
61+
#define FIN_ABORT 0x4
5962

6063

61-
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
64+
65+
static void pgsql_stmt_finish(pdo_pgsql_stmt *S, int fin_mode)
6266
{
63-
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
64-
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
65-
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
66-
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
67+
if (S->is_running_unbuffered && S->result && (fin_mode & FIN_ABORT)) {
68+
PGcancel *cancel = PQgetCancel(S->H->server);
69+
char errbuf[256];
70+
PQcancel(cancel, errbuf, 256);
71+
PQfreeCancel(cancel);
72+
S->is_running_unbuffered = false;
73+
}
6774

6875
if (S->result) {
69-
/* free the resource */
7076
PQclear(S->result);
7177
S->result = NULL;
7278
}
7379

74-
if (S->stmt_name) {
75-
if (S->is_prepared && server_obj_usable) {
76-
pdo_pgsql_db_handle *H = S->H;
77-
char *q = NULL;
78-
PGresult *res;
80+
if (S->is_running_unbuffered) {
81+
/* https://postgresql.org/docs/current/libpq-async.html:
82+
* "PQsendQuery cannot be called again until PQgetResult has returned NULL"
83+
* And as all single-row functions are connection-wise instead of statement-wise,
84+
* any new single-row query has to make sure no preceding one is still running.
85+
*/
86+
// @todo Implement !(fin_mode & FIN_DISCARD)
87+
// instead of discarding results we could store them to their statement
88+
// so that their fetch() will get them (albeit not in lazy mode anymore).
89+
while ((S->result = PQgetResult(S->H->server))) {
90+
PQclear(S->result);
91+
S->result = NULL;
92+
}
93+
S->is_running_unbuffered = false;
94+
}
7995

80-
spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name);
81-
res = PQexec(H->server, q);
82-
efree(q);
83-
if (res) {
84-
PQclear(res);
85-
}
96+
if (S->stmt_name && S->is_prepared && (fin_mode & FIN_CLOSE)) {
97+
char *q = NULL;
98+
PGresult *res;
99+
100+
spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name);
101+
res = PQexec(S->H->server, q);
102+
efree(q);
103+
if (res) {
104+
PQclear(res);
105+
}
106+
107+
S->is_prepared = false;
108+
if (S->H->running_stmt == S) {
109+
S->H->running_stmt = NULL;
86110
}
111+
}
112+
}
113+
114+
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
115+
{
116+
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
117+
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
118+
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
119+
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
120+
121+
pgsql_stmt_finish(S, FIN_DISCARD|(server_obj_usable ? FIN_CLOSE|FIN_ABORT : 0));
122+
123+
if (S->stmt_name) {
87124
efree(S->stmt_name);
88125
S->stmt_name = NULL;
89126
}
@@ -137,14 +174,20 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
137174
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
138175
pdo_pgsql_db_handle *H = S->H;
139176
ExecStatusType status;
177+
int dispatch_result = 1;
140178

141179
bool in_trans = stmt->dbh->methods->in_transaction(stmt->dbh);
142180

143-
/* ensure that we free any previous unfetched results */
144-
if(S->result) {
145-
PQclear(S->result);
146-
S->result = NULL;
181+
/* in unbuffered mode, finish any running statement: libpq explicitely prohibits this
182+
* and returns a PGRES_FATAL_ERROR when PQgetResult gets called for stmt 2 if DEALLOCATE
183+
* was called for stmt 1 inbetween
184+
* (maybe it will change with pipeline mode in libpq 14?) */
185+
if (S->is_unbuffered && H->running_stmt) {
186+
pgsql_stmt_finish(H->running_stmt, FIN_CLOSE);
187+
H->running_stmt = NULL;
147188
}
189+
/* ensure that we free any previous unfetched results */
190+
pgsql_stmt_finish(S, 0);
148191

149192
S->current_row = 0;
150193

@@ -219,6 +262,16 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
219262
}
220263
}
221264
}
265+
if (S->is_unbuffered) {
266+
dispatch_result = PQsendQueryPrepared(H->server, S->stmt_name,
267+
stmt->bound_params ?
268+
zend_hash_num_elements(stmt->bound_params) :
269+
0,
270+
(const char**)S->param_values,
271+
S->param_lengths,
272+
S->param_formats,
273+
0);
274+
} else {
222275
S->result = PQexecPrepared(H->server, S->stmt_name,
223276
stmt->bound_params ?
224277
zend_hash_num_elements(stmt->bound_params) :
@@ -227,22 +280,54 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
227280
S->param_lengths,
228281
S->param_formats,
229282
0);
283+
}
230284
} else if (stmt->supports_placeholders == PDO_PLACEHOLDER_NAMED) {
231285
/* execute query with parameters */
286+
if (S->is_unbuffered) {
287+
dispatch_result = PQsendQueryParams(H->server, ZSTR_VAL(S->query),
288+
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
289+
S->param_types,
290+
(const char**)S->param_values,
291+
S->param_lengths,
292+
S->param_formats,
293+
0);
294+
} else {
232295
S->result = PQexecParams(H->server, ZSTR_VAL(S->query),
233296
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
234297
S->param_types,
235298
(const char**)S->param_values,
236299
S->param_lengths,
237300
S->param_formats,
238301
0);
302+
}
239303
} else {
240304
/* execute plain query (with embedded parameters) */
305+
if (S->is_unbuffered) {
306+
dispatch_result = PQsendQuery(H->server, ZSTR_VAL(stmt->active_query_string));
307+
} else {
241308
S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string));
309+
}
310+
}
311+
312+
H->running_stmt = S;
313+
314+
if (S->is_unbuffered) {
315+
if (!dispatch_result) {
316+
pdo_pgsql_error_stmt(stmt, 0, NULL);
317+
H->running_stmt = NULL;
318+
return 0;
319+
}
320+
S->is_running_unbuffered = true;
321+
PQsetSingleRowMode(H->server);
322+
/* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */
323+
324+
/* try a first fetch to at least have column names and so on */
325+
S->result = PQgetResult(S->H->server);
242326
}
327+
243328
status = PQresultStatus(S->result);
244329

245-
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
330+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
246331
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
247332
return 0;
248333
}
@@ -464,6 +549,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt,
464549
return 0;
465550
}
466551
} else {
552+
if (S->is_running_unbuffered && S->current_row >= stmt->row_count) {
553+
ExecStatusType status;
554+
555+
/* @todo in unbuffered mode, PQ allows multiple queries to be passed:
556+
* column_count should be recomputed on each iteration */
557+
558+
if(S->result) {
559+
PQclear(S->result);
560+
S->result = NULL;
561+
}
562+
563+
S->result = PQgetResult(S->H->server);
564+
status = PQresultStatus(S->result);
565+
566+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
567+
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
568+
return 0;
569+
}
570+
571+
stmt->row_count = (zend_long)PQntuples(S->result);
572+
S->current_row = 0;
573+
574+
if (!stmt->row_count) {
575+
S->is_running_unbuffered = false;
576+
/* libpq requires looping until getResult returns null */
577+
pgsql_stmt_finish(S, 0);
578+
}
579+
}
467580
if (S->current_row < stmt->row_count) {
468581
S->current_row++;
469582
return 1;

ext/pdo_pgsql/php_pdo_pgsql_int.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ typedef struct {
3232
char *errmsg;
3333
} pdo_pgsql_error_info;
3434

35+
typedef struct pdo_pgsql_stmt pdo_pgsql_stmt;
36+
3537
/* stuff we use in a pgsql database handle */
3638
typedef struct {
3739
PGconn *server;
@@ -47,13 +49,15 @@ typedef struct {
4749
bool disable_prepares;
4850
HashTable *lob_streams;
4951
zend_fcall_info_cache *notice_callback;
52+
bool default_fetching_laziness;
53+
pdo_pgsql_stmt *running_stmt;
5054
} pdo_pgsql_db_handle;
5155

5256
typedef struct {
5357
Oid pgsql_type;
5458
} pdo_pgsql_column;
5559

56-
typedef struct {
60+
struct pdo_pgsql_stmt {
5761
pdo_pgsql_db_handle *H;
5862
PGresult *result;
5963
pdo_pgsql_column *cols;
@@ -66,7 +70,9 @@ typedef struct {
6670
Oid *param_types;
6771
int current_row;
6872
bool is_prepared;
69-
} pdo_pgsql_stmt;
73+
bool is_unbuffered;
74+
bool is_running_unbuffered;
75+
};
7076

7177
typedef struct {
7278
Oid oid;

0 commit comments

Comments
 (0)