Skip to content

Commit 1945a9b

Browse files
committed
pdo_pgsql: unbuffered fetching
fetch part of the lazy fetch mode for Pdo\Pgsql Starts fixing #15287
1 parent f2ad62b commit 1945a9b

File tree

2 files changed

+143
-23
lines changed

2 files changed

+143
-23
lines changed

ext/pdo_pgsql/pgsql_statement.c

Lines changed: 135 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,34 +56,72 @@
5656
#define FLOAT8LABEL "float8"
5757
#define FLOAT8OID 701
5858

59+
#define FIN_DISCARD 0x1
60+
#define FIN_CLOSE 0x2
61+
#define FIN_ABORT 0x4
5962

6063

61-
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
64+
65+
static void pgsql_stmt_finish(pdo_pgsql_stmt *S, int fin_mode)
6266
{
63-
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
64-
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
65-
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
66-
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
67+
if (S->is_running_unbuffered && S->result && (fin_mode & FIN_ABORT)) {
68+
PGcancel *cancel = PQgetCancel(S->H->server);
69+
char errbuf[256];
70+
PQcancel(cancel, errbuf, 256);
71+
PQfreeCancel(cancel);
72+
S->is_running_unbuffered = false;
73+
}
6774

6875
if (S->result) {
6976
/* free the resource */
7077
PQclear(S->result);
7178
S->result = NULL;
7279
}
7380

74-
if (S->stmt_name) {
75-
if (S->is_prepared && server_obj_usable) {
76-
pdo_pgsql_db_handle *H = S->H;
77-
char *q = NULL;
78-
PGresult *res;
81+
if (S->is_running_unbuffered) {
82+
/* https://postgresql.org/docs/current/libpq-async.html:
83+
* "PQsendQuery cannot be called again until PQgetResult has returned NULL"
84+
* And as all single-row functions are connection-wise instead of statement-wise,
85+
* any new single-row query has to make sure no preceding one is still running.
86+
*/
87+
// @todo Implement !(fin_mode & FIN_DISCARD)
88+
// instead of discarding results we could store them to their statement
89+
// so that their fetch() will get them (albeit not in lazy mode anymore).
90+
while ((S->result = PQgetResult(S->H->server))) {
91+
PQclear(S->result);
92+
S->result = NULL;
93+
}
94+
S->is_running_unbuffered = false;
95+
}
7996

80-
spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name);
81-
res = PQexec(H->server, q);
82-
efree(q);
83-
if (res) {
84-
PQclear(res);
85-
}
97+
if (S->stmt_name && S->is_prepared && (fin_mode & FIN_CLOSE)) {
98+
char *q = NULL;
99+
PGresult *res;
100+
101+
spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name);
102+
res = PQexec(S->H->server, q);
103+
efree(q);
104+
if (res) {
105+
PQclear(res);
106+
}
107+
108+
S->is_prepared = false;
109+
if (S->H->running_stmt == S) {
110+
S->H->running_stmt = NULL;
86111
}
112+
}
113+
}
114+
115+
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
116+
{
117+
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
118+
bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle)
119+
&& IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)])
120+
&& !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED);
121+
122+
pgsql_stmt_finish(S, FIN_DISCARD|(server_obj_usable ? FIN_CLOSE|FIN_ABORT : 0));
123+
124+
if (S->stmt_name) {
87125
efree(S->stmt_name);
88126
S->stmt_name = NULL;
89127
}
@@ -137,14 +175,20 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
137175
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
138176
pdo_pgsql_db_handle *H = S->H;
139177
ExecStatusType status;
178+
int dispatch_result = 1;
140179

141180
bool in_trans = stmt->dbh->methods->in_transaction(stmt->dbh);
142181

143-
/* ensure that we free any previous unfetched results */
144-
if(S->result) {
145-
PQclear(S->result);
146-
S->result = NULL;
182+
/* in unbuffered mode, finish any running statement: libpq explicitely prohibits this
183+
* and returns a PGRES_FATAL_ERROR when PQgetResult gets called for stmt 2 if DEALLOCATE
184+
* was called for stmt 1 inbetween
185+
* (maybe it will change with pipeline mode in libpq 14?) */
186+
if (S->is_unbuffered && H->running_stmt) {
187+
pgsql_stmt_finish(H->running_stmt, FIN_CLOSE);
188+
H->running_stmt = NULL;
147189
}
190+
/* ensure that we free any previous unfetched results */
191+
pgsql_stmt_finish(S, 0);
148192

149193
S->current_row = 0;
150194

@@ -219,6 +263,16 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
219263
}
220264
}
221265
}
266+
if (S->is_unbuffered) {
267+
dispatch_result = PQsendQueryPrepared(H->server, S->stmt_name,
268+
stmt->bound_params ?
269+
zend_hash_num_elements(stmt->bound_params) :
270+
0,
271+
(const char**)S->param_values,
272+
S->param_lengths,
273+
S->param_formats,
274+
0);
275+
} else {
222276
S->result = PQexecPrepared(H->server, S->stmt_name,
223277
stmt->bound_params ?
224278
zend_hash_num_elements(stmt->bound_params) :
@@ -227,22 +281,54 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
227281
S->param_lengths,
228282
S->param_formats,
229283
0);
284+
}
230285
} else if (stmt->supports_placeholders == PDO_PLACEHOLDER_NAMED) {
231286
/* execute query with parameters */
287+
if (S->is_unbuffered) {
288+
dispatch_result = PQsendQueryParams(H->server, ZSTR_VAL(S->query),
289+
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
290+
S->param_types,
291+
(const char**)S->param_values,
292+
S->param_lengths,
293+
S->param_formats,
294+
0);
295+
} else {
232296
S->result = PQexecParams(H->server, ZSTR_VAL(S->query),
233297
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
234298
S->param_types,
235299
(const char**)S->param_values,
236300
S->param_lengths,
237301
S->param_formats,
238302
0);
303+
}
239304
} else {
240305
/* execute plain query (with embedded parameters) */
306+
if (S->is_unbuffered) {
307+
dispatch_result = PQsendQuery(H->server, ZSTR_VAL(stmt->active_query_string));
308+
} else {
241309
S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string));
310+
}
311+
}
312+
313+
H->running_stmt = S;
314+
315+
if (S->is_unbuffered) {
316+
if (!dispatch_result) {
317+
pdo_pgsql_error_stmt(stmt, 0, NULL);
318+
H->running_stmt = NULL;
319+
return 0;
320+
}
321+
S->is_running_unbuffered = true;
322+
PQsetSingleRowMode(H->server);
323+
/* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */
324+
325+
/* try a first fetch to at least have column names and so on */
326+
S->result = PQgetResult(S->H->server);
242327
}
328+
243329
status = PQresultStatus(S->result);
244330

245-
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
331+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
246332
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
247333
return 0;
248334
}
@@ -464,6 +550,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt,
464550
return 0;
465551
}
466552
} else {
553+
if (S->is_running_unbuffered && S->current_row >= stmt->row_count) {
554+
ExecStatusType status;
555+
556+
/* @todo in unbuffered mode, PQ allows multiple queries to be passed:
557+
* column_count should be recomputed on each iteration */
558+
559+
if(S->result) {
560+
PQclear(S->result);
561+
S->result = NULL;
562+
}
563+
564+
S->result = PQgetResult(S->H->server);
565+
status = PQresultStatus(S->result);
566+
567+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
568+
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
569+
return 0;
570+
}
571+
572+
stmt->row_count = (zend_long)PQntuples(S->result);
573+
S->current_row = 0;
574+
575+
if (!stmt->row_count) {
576+
S->is_running_unbuffered = false;
577+
/* libpq requires looping until getResult returns null */
578+
pgsql_stmt_finish(S, 0);
579+
}
580+
}
467581
if (S->current_row < stmt->row_count) {
468582
S->current_row++;
469583
return 1;

ext/pdo_pgsql/php_pdo_pgsql_int.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ typedef struct {
3232
char *errmsg;
3333
} pdo_pgsql_error_info;
3434

35+
typedef struct pdo_pgsql_stmt pdo_pgsql_stmt;
36+
3537
/* stuff we use in a pgsql database handle */
3638
typedef struct {
3739
PGconn *server;
@@ -47,13 +49,15 @@ typedef struct {
4749
bool disable_prepares;
4850
HashTable *lob_streams;
4951
zend_fcall_info_cache *notice_callback;
52+
bool default_fetching_laziness;
53+
pdo_pgsql_stmt *running_stmt;
5054
} pdo_pgsql_db_handle;
5155

5256
typedef struct {
5357
Oid pgsql_type;
5458
} pdo_pgsql_column;
5559

56-
typedef struct {
60+
struct pdo_pgsql_stmt {
5761
pdo_pgsql_db_handle *H;
5862
PGresult *result;
5963
pdo_pgsql_column *cols;
@@ -66,7 +70,9 @@ typedef struct {
6670
Oid *param_types;
6771
int current_row;
6872
bool is_prepared;
69-
} pdo_pgsql_stmt;
73+
bool is_unbuffered;
74+
bool is_running_unbuffered;
75+
};
7076

7177
typedef struct {
7278
Oid oid;

0 commit comments

Comments
 (0)