|
56 | 56 | #define FLOAT8LABEL "float8"
|
57 | 57 | #define FLOAT8OID 701
|
58 | 58 |
|
| 59 | +#define FIN_DISCARD 0x1 |
| 60 | +#define FIN_CLOSE 0x2 |
| 61 | +#define FIN_ABORT 0x4 |
59 | 62 |
|
60 | 63 |
|
61 |
| -static int pgsql_stmt_dtor(pdo_stmt_t *stmt) |
| 64 | + |
| 65 | +static void pgsql_stmt_finish(pdo_pgsql_stmt *S, int fin_mode) |
62 | 66 | {
|
63 |
| - pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data; |
64 |
| - bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle) |
65 |
| - && IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)]) |
66 |
| - && !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED); |
| 67 | + pdo_pgsql_db_handle *H = S->H; |
| 68 | + |
| 69 | + if (S->is_running_unbuffered && S->result && (fin_mode & FIN_ABORT)) { |
| 70 | + PGcancel *cancel = PQgetCancel(H->server); |
| 71 | + char errbuf[256]; |
| 72 | + PQcancel(cancel, errbuf, 256); |
| 73 | + PQfreeCancel(cancel); |
| 74 | + S->is_running_unbuffered = false; |
| 75 | + } |
67 | 76 |
|
68 | 77 | if (S->result) {
|
69 | 78 | /* free the resource */
|
70 | 79 | PQclear(S->result);
|
71 | 80 | S->result = NULL;
|
72 | 81 | }
|
73 | 82 |
|
74 |
| - if (S->stmt_name) { |
75 |
| - if (S->is_prepared && server_obj_usable) { |
76 |
| - pdo_pgsql_db_handle *H = S->H; |
77 |
| - PGresult *res; |
| 83 | + if (S->is_running_unbuffered) { |
| 84 | + /* https://postgresql.org/docs/current/libpq-async.html: |
| 85 | + * "PQsendQuery cannot be called again until PQgetResult has returned NULL" |
| 86 | + * And as all single-row functions are connection-wise instead of statement-wise, |
| 87 | + * any new single-row query has to make sure no preceding one is still running. |
| 88 | + */ |
| 89 | + // @todo Implement !(fin_mode & FIN_DISCARD) |
| 90 | + // instead of discarding results we could store them to their statement |
| 91 | + // so that their fetch() will get them (albeit not in lazy mode anymore). |
| 92 | + while ((S->result = PQgetResult(H->server))) { |
| 93 | + PQclear(S->result); |
| 94 | + S->result = NULL; |
| 95 | + } |
| 96 | + S->is_running_unbuffered = false; |
| 97 | + } |
| 98 | + |
| 99 | + if (S->stmt_name && S->is_prepared && (fin_mode & FIN_CLOSE)) { |
| 100 | + PGresult *res; |
78 | 101 | #ifndef HAVE_PQCLOSEPREPARED
|
79 |
| - // TODO (??) libpq does not support close statement protocol < postgres 17 |
80 |
| - // check if we can circumvent this. |
81 |
| - char *q = NULL; |
82 |
| - spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name); |
83 |
| - res = PQexec(H->server, q); |
84 |
| - efree(q); |
| 102 | + // TODO (??) libpq does not support close statement protocol < postgres 17 |
| 103 | + // check if we can circumvent this. |
| 104 | + char *q = NULL; |
| 105 | + spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name); |
| 106 | + res = PQexec(H->server, q); |
| 107 | + efree(q); |
85 | 108 | #else
|
86 |
| - res = PQclosePrepared(H->server, S->stmt_name); |
| 109 | + res = PQclosePrepared(H->server, S->stmt_name); |
87 | 110 | #endif
|
88 |
| - if (res) { |
89 |
| - PQclear(res); |
90 |
| - } |
| 111 | + if (res) { |
| 112 | + PQclear(res); |
| 113 | + } |
| 114 | + |
| 115 | + S->is_prepared = false; |
| 116 | + if (H->running_stmt == S) { |
| 117 | + H->running_stmt = NULL; |
91 | 118 | }
|
| 119 | + } |
| 120 | +} |
| 121 | + |
| 122 | +static int pgsql_stmt_dtor(pdo_stmt_t *stmt) |
| 123 | +{ |
| 124 | + pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data; |
| 125 | + bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle) |
| 126 | + && IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)]) |
| 127 | + && !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED); |
| 128 | + |
| 129 | + pgsql_stmt_finish(S, FIN_DISCARD|(server_obj_usable ? FIN_CLOSE|FIN_ABORT : 0)); |
| 130 | + |
| 131 | + if (S->stmt_name) { |
92 | 132 | efree(S->stmt_name);
|
93 | 133 | S->stmt_name = NULL;
|
94 | 134 | }
|
@@ -142,14 +182,20 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
|
142 | 182 | pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data;
|
143 | 183 | pdo_pgsql_db_handle *H = S->H;
|
144 | 184 | ExecStatusType status;
|
| 185 | + int dispatch_result = 1; |
145 | 186 |
|
146 | 187 | bool in_trans = stmt->dbh->methods->in_transaction(stmt->dbh);
|
147 | 188 |
|
148 |
| - /* ensure that we free any previous unfetched results */ |
149 |
| - if(S->result) { |
150 |
| - PQclear(S->result); |
151 |
| - S->result = NULL; |
| 189 | + /* in unbuffered mode, finish any running statement: libpq explicitely prohibits this |
| 190 | + * and returns a PGRES_FATAL_ERROR when PQgetResult gets called for stmt 2 if DEALLOCATE |
| 191 | + * was called for stmt 1 inbetween |
| 192 | + * (maybe it will change with pipeline mode in libpq 14?) */ |
| 193 | + if (S->is_unbuffered && H->running_stmt) { |
| 194 | + pgsql_stmt_finish(H->running_stmt, FIN_CLOSE); |
| 195 | + H->running_stmt = NULL; |
152 | 196 | }
|
| 197 | + /* ensure that we free any previous unfetched results */ |
| 198 | + pgsql_stmt_finish(S, 0); |
153 | 199 |
|
154 | 200 | S->current_row = 0;
|
155 | 201 |
|
@@ -198,6 +244,7 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
|
198 | 244 | /* it worked */
|
199 | 245 | S->is_prepared = 1;
|
200 | 246 | PQclear(S->result);
|
| 247 | + S->result = NULL; |
201 | 248 | break;
|
202 | 249 | default: {
|
203 | 250 | char *sqlstate = pdo_pgsql_sqlstate(S->result);
|
@@ -227,30 +274,72 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
|
227 | 274 | }
|
228 | 275 | }
|
229 | 276 | }
|
230 |
| - S->result = PQexecPrepared(H->server, S->stmt_name, |
| 277 | + if (S->is_unbuffered) { |
| 278 | + dispatch_result = PQsendQueryPrepared(H->server, S->stmt_name, |
| 279 | + stmt->bound_params ? |
| 280 | + zend_hash_num_elements(stmt->bound_params) : |
| 281 | + 0, |
| 282 | + (const char**)S->param_values, |
| 283 | + S->param_lengths, |
| 284 | + S->param_formats, |
| 285 | + 0); |
| 286 | + } else { |
| 287 | + S->result = PQexecPrepared(H->server, S->stmt_name, |
231 | 288 | stmt->bound_params ?
|
232 | 289 | zend_hash_num_elements(stmt->bound_params) :
|
233 | 290 | 0,
|
234 | 291 | (const char**)S->param_values,
|
235 | 292 | S->param_lengths,
|
236 | 293 | S->param_formats,
|
237 | 294 | 0);
|
| 295 | + } |
238 | 296 | } else if (stmt->supports_placeholders == PDO_PLACEHOLDER_NAMED) {
|
239 | 297 | /* execute query with parameters */
|
240 |
| - S->result = PQexecParams(H->server, ZSTR_VAL(S->query), |
| 298 | + if (S->is_unbuffered) { |
| 299 | + dispatch_result = PQsendQueryParams(H->server, ZSTR_VAL(S->query), |
| 300 | + stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0, |
| 301 | + S->param_types, |
| 302 | + (const char**)S->param_values, |
| 303 | + S->param_lengths, |
| 304 | + S->param_formats, |
| 305 | + 0); |
| 306 | + } else { |
| 307 | + S->result = PQexecParams(H->server, ZSTR_VAL(S->query), |
241 | 308 | stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0,
|
242 | 309 | S->param_types,
|
243 | 310 | (const char**)S->param_values,
|
244 | 311 | S->param_lengths,
|
245 | 312 | S->param_formats,
|
246 | 313 | 0);
|
| 314 | + } |
247 | 315 | } else {
|
248 | 316 | /* execute plain query (with embedded parameters) */
|
249 |
| - S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string)); |
| 317 | + if (S->is_unbuffered) { |
| 318 | + dispatch_result = PQsendQuery(H->server, ZSTR_VAL(stmt->active_query_string)); |
| 319 | + } else { |
| 320 | + S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string)); |
| 321 | + } |
250 | 322 | }
|
| 323 | + |
| 324 | + H->running_stmt = S; |
| 325 | + |
| 326 | + if (S->is_unbuffered) { |
| 327 | + if (!dispatch_result) { |
| 328 | + pdo_pgsql_error_stmt(stmt, 0, NULL); |
| 329 | + H->running_stmt = NULL; |
| 330 | + return 0; |
| 331 | + } |
| 332 | + S->is_running_unbuffered = true; |
| 333 | + (void)PQsetSingleRowMode(H->server); |
| 334 | + /* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */ |
| 335 | + |
| 336 | + /* try a first fetch to at least have column names and so on */ |
| 337 | + S->result = PQgetResult(S->H->server); |
| 338 | + } |
| 339 | + |
251 | 340 | status = PQresultStatus(S->result);
|
252 | 341 |
|
253 |
| - if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) { |
| 342 | + if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) { |
254 | 343 | pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result));
|
255 | 344 | return 0;
|
256 | 345 | }
|
@@ -472,6 +561,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt,
|
472 | 561 | return 0;
|
473 | 562 | }
|
474 | 563 | } else {
|
| 564 | + if (S->is_running_unbuffered && S->current_row >= stmt->row_count) { |
| 565 | + ExecStatusType status; |
| 566 | + |
| 567 | + /* @todo in unbuffered mode, PQ allows multiple queries to be passed: |
| 568 | + * column_count should be recomputed on each iteration */ |
| 569 | + |
| 570 | + if(S->result) { |
| 571 | + PQclear(S->result); |
| 572 | + S->result = NULL; |
| 573 | + } |
| 574 | + |
| 575 | + S->result = PQgetResult(S->H->server); |
| 576 | + status = PQresultStatus(S->result); |
| 577 | + |
| 578 | + if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) { |
| 579 | + pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result)); |
| 580 | + return 0; |
| 581 | + } |
| 582 | + |
| 583 | + stmt->row_count = (zend_long)PQntuples(S->result); |
| 584 | + S->current_row = 0; |
| 585 | + |
| 586 | + if (!stmt->row_count) { |
| 587 | + S->is_running_unbuffered = false; |
| 588 | + /* libpq requires looping until getResult returns null */ |
| 589 | + pgsql_stmt_finish(S, 0); |
| 590 | + } |
| 591 | + } |
475 | 592 | if (S->current_row < stmt->row_count) {
|
476 | 593 | S->current_row++;
|
477 | 594 | return 1;
|
|
0 commit comments