Skip to content

Commit c45b3f3

Browse files
committed
pdo_pgsql: unbuffered fetching
fetch part of the lazy fetch mode for Pdo\Pgsql Starts fixing #15287
1 parent f2ad62b commit c45b3f3

File tree

2 files changed

+86
-1
lines changed

2 files changed

+86
-1
lines changed

ext/pdo_pgsql/pgsql_statement.c

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,17 @@
5858

5959

6060

61+
static void pgsql_stmt_cancel(pdo_pgsql_stmt *S)
62+
{
63+
if (S->is_running_unbuffered) {
64+
PGcancel *cancel = PQgetCancel(S->H->server);
65+
char errbuf[256];
66+
PQcancel(cancel, errbuf, 256);
67+
PQfreeCancel(cancel);
68+
S->is_running_unbuffered = false;
69+
}
70+
}
71+
6172
static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
6273
{
6374
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
@@ -108,6 +119,8 @@ static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
108119
S->query = NULL;
109120
}
110121

122+
pgsql_stmt_cancel(S);
123+
111124
if (S->cursor_name) {
112125
if (server_obj_usable) {
113126
pdo_pgsql_db_handle *H = S->H;
@@ -137,10 +150,12 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
137150
pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
138151
pdo_pgsql_db_handle *H = S->H;
139152
ExecStatusType status;
153+
int dispatch_result = 1;
140154

141155
bool in_trans = stmt->dbh->methods->in_transaction(stmt->dbh);
142156

143157
/* ensure that we free any previous unfetched results */
158+
pgsql_stmt_cancel(S);
144159
if(S->result) {
145160
PQclear(S->result);
146161
S->result = NULL;
@@ -219,6 +234,16 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
219234
}
220235
}
221236
}
237+
if (S->is_unbuffered) {
238+
dispatch_result = PQsendQueryPrepared(H->server, S->stmt_name,
239+
stmt->bound_params ?
240+
zend_hash_num_elements(stmt->bound_params) :
241+
0,
242+
(const char**)S->param_values,
243+
S->param_lengths,
244+
S->param_formats,
245+
0);
246+
} else {
222247
S->result = PQexecPrepared(H->server, S->stmt_name,
223248
stmt->bound_params ?
224249
zend_hash_num_elements(stmt->bound_params) :
@@ -227,22 +252,51 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
227252
S->param_lengths,
228253
S->param_formats,
229254
0);
255+
}
230256
} else if (stmt->supports_placeholders == PDO_PLACEHOLDER_NAMED) {
231257
/* execute query with parameters */
258+
if (S->is_unbuffered) {
259+
dispatch_result = PQsendQueryParams(H->server, ZSTR_VAL(S->query),
260+
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
261+
S->param_types,
262+
(const char**)S->param_values,
263+
S->param_lengths,
264+
S->param_formats,
265+
0);
266+
} else {
232267
S->result = PQexecParams(H->server, ZSTR_VAL(S->query),
233268
stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
234269
S->param_types,
235270
(const char**)S->param_values,
236271
S->param_lengths,
237272
S->param_formats,
238273
0);
274+
}
239275
} else {
240276
/* execute plain query (with embedded parameters) */
277+
if (S->is_unbuffered) {
278+
dispatch_result = PQsendQuery(H->server, ZSTR_VAL(stmt->active_query_string));
279+
} else {
241280
S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string));
281+
}
242282
}
283+
284+
if (S->is_unbuffered) {
285+
if (!dispatch_result) {
286+
pdo_pgsql_error_stmt(stmt, 0, NULL);
287+
return 0;
288+
}
289+
S->is_running_unbuffered = true;
290+
PQsetSingleRowMode(H->server);
291+
/* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */
292+
293+
/* try a first fetch to at least have column names and so on */
294+
S->result = PQgetResult(S->H->server);
295+
}
296+
243297
status = PQresultStatus(S->result);
244298

245-
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
299+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
246300
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
247301
return 0;
248302
}
@@ -464,6 +518,35 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt,
464518
return 0;
465519
}
466520
} else {
521+
if (S->is_running_unbuffered && S->current_row >= stmt->row_count) {
522+
ExecStatusType status;
523+
524+
/* @todo in unbuffered mode, PQ allows multiple queries to be passed:
525+
* column_count should be recomputed on each iteration */
526+
527+
if(S->result) {
528+
PQclear(S->result);
529+
S->result = NULL;
530+
}
531+
532+
S->result = PQgetResult(S->H->server);
533+
status = PQresultStatus(S->result);
534+
535+
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) {
536+
pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
537+
return 0;
538+
}
539+
540+
stmt->row_count = (zend_long)PQntuples(S->result);
541+
S->current_row = 0;
542+
543+
if (!stmt->row_count) {
544+
/* libpq requires looping until getResult returns null */
545+
PQgetResult(S->H->server);
546+
/* @todo receiving a result here is unexpected and should throw an error */
547+
S->is_running_unbuffered = false;
548+
}
549+
}
467550
if (S->current_row < stmt->row_count) {
468551
S->current_row++;
469552
return 1;

ext/pdo_pgsql/php_pdo_pgsql_int.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ typedef struct {
6666
Oid *param_types;
6767
int current_row;
6868
bool is_prepared;
69+
bool is_unbuffered;
70+
bool is_running_unbuffered;
6971
} pdo_pgsql_stmt;
7072

7173
typedef struct {

0 commit comments

Comments
 (0)