56
56
#define FLOAT8LABEL "float8"
57
57
#define FLOAT8OID 701
58
58
59
+ #define FIN_DISCARD 0x1
60
+ #define FIN_CLOSE 0x2
61
+ #define FIN_ABORT 0x4
59
62
60
63
61
- static int pgsql_stmt_dtor (pdo_stmt_t * stmt )
64
+
65
+ static void pgsql_stmt_finish (pdo_pgsql_stmt * S , int fin_mode )
62
66
{
63
- pdo_pgsql_stmt * S = (pdo_pgsql_stmt * )stmt -> driver_data ;
64
- bool server_obj_usable = !Z_ISUNDEF (stmt -> database_object_handle )
65
- && IS_OBJ_VALID (EG (objects_store ).object_buckets [Z_OBJ_HANDLE (stmt -> database_object_handle )])
66
- && !(OBJ_FLAGS (Z_OBJ (stmt -> database_object_handle )) & IS_OBJ_FREE_CALLED );
67
+ pdo_pgsql_db_handle * H = S -> H ;
68
+
69
+ if (S -> is_running_unbuffered && S -> result && (fin_mode & FIN_ABORT )) {
70
+ PGcancel * cancel = PQgetCancel (H -> server );
71
+ char errbuf [256 ];
72
+ PQcancel (cancel , errbuf , 256 );
73
+ PQfreeCancel (cancel );
74
+ S -> is_running_unbuffered = false;
75
+ }
67
76
68
77
if (S -> result ) {
69
78
/* free the resource */
70
79
PQclear (S -> result );
71
80
S -> result = NULL ;
72
81
}
73
82
74
- if (S -> stmt_name ) {
75
- if (S -> is_prepared && server_obj_usable ) {
76
- pdo_pgsql_db_handle * H = S -> H ;
77
- PGresult * res ;
83
+ if (S -> is_running_unbuffered ) {
84
+ /* https://postgresql.org/docs/current/libpq-async.html:
85
+ * "PQsendQuery cannot be called again until PQgetResult has returned NULL"
86
+ * And as all single-row functions are connection-wise instead of statement-wise,
87
+ * any new single-row query has to make sure no preceding one is still running.
88
+ */
89
+ // @todo Implement !(fin_mode & FIN_DISCARD)
90
+ // instead of discarding results we could store them to their statement
91
+ // so that their fetch() will get them (albeit not in lazy mode anymore).
92
+ while ((S -> result = PQgetResult (H -> server ))) {
93
+ PQclear (S -> result );
94
+ S -> result = NULL ;
95
+ }
96
+ S -> is_running_unbuffered = false;
97
+ }
98
+
99
+ if (S -> stmt_name && S -> is_prepared && (fin_mode & FIN_CLOSE )) {
100
+ PGresult * res ;
78
101
#ifndef HAVE_PQCLOSEPREPARED
79
102
// TODO (??) libpq does not support close statement protocol < postgres 17
80
103
// check if we can circumvent this.
@@ -88,7 +111,24 @@ static int pgsql_stmt_dtor(pdo_stmt_t *stmt)
88
111
if (res ) {
89
112
PQclear (res );
90
113
}
114
+
115
+ S -> is_prepared = false;
116
+ if (H -> running_stmt == S ) {
117
+ H -> running_stmt = NULL ;
91
118
}
119
+ }
120
+ }
121
+
122
+ static int pgsql_stmt_dtor (pdo_stmt_t * stmt )
123
+ {
124
+ pdo_pgsql_stmt * S = (pdo_pgsql_stmt * )stmt -> driver_data ;
125
+ bool server_obj_usable = !Z_ISUNDEF (stmt -> database_object_handle )
126
+ && IS_OBJ_VALID (EG (objects_store ).object_buckets [Z_OBJ_HANDLE (stmt -> database_object_handle )])
127
+ && !(OBJ_FLAGS (Z_OBJ (stmt -> database_object_handle )) & IS_OBJ_FREE_CALLED );
128
+
129
+ pgsql_stmt_finish (S , FIN_DISCARD |(server_obj_usable ? FIN_CLOSE |FIN_ABORT : 0 ));
130
+
131
+ if (S -> stmt_name ) {
92
132
efree (S -> stmt_name );
93
133
S -> stmt_name = NULL ;
94
134
}
@@ -142,14 +182,20 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
142
182
pdo_pgsql_stmt * S = (pdo_pgsql_stmt * )stmt -> driver_data ;
143
183
pdo_pgsql_db_handle * H = S -> H ;
144
184
ExecStatusType status ;
185
+ int dispatch_result = 1 ;
145
186
146
187
bool in_trans = stmt -> dbh -> methods -> in_transaction (stmt -> dbh );
147
188
148
- /* ensure that we free any previous unfetched results */
149
- if (S -> result ) {
150
- PQclear (S -> result );
151
- S -> result = NULL ;
189
+ /* in unbuffered mode, finish any running statement: libpq explicitely prohibits this
190
+ * and returns a PGRES_FATAL_ERROR when PQgetResult gets called for stmt 2 if DEALLOCATE
191
+ * was called for stmt 1 inbetween
192
+ * (maybe it will change with pipeline mode in libpq 14?) */
193
+ if (S -> is_unbuffered && H -> running_stmt ) {
194
+ pgsql_stmt_finish (H -> running_stmt , FIN_CLOSE );
195
+ H -> running_stmt = NULL ;
152
196
}
197
+ /* ensure that we free any previous unfetched results */
198
+ pgsql_stmt_finish (S , 0 );
153
199
154
200
S -> current_row = 0 ;
155
201
@@ -228,6 +274,16 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
228
274
}
229
275
}
230
276
}
277
+ if (S -> is_unbuffered ) {
278
+ dispatch_result = PQsendQueryPrepared (H -> server , S -> stmt_name ,
279
+ stmt -> bound_params ?
280
+ zend_hash_num_elements (stmt -> bound_params ) :
281
+ 0 ,
282
+ (const char * * )S -> param_values ,
283
+ S -> param_lengths ,
284
+ S -> param_formats ,
285
+ 0 );
286
+ } else {
231
287
S -> result = PQexecPrepared (H -> server , S -> stmt_name ,
232
288
stmt -> bound_params ?
233
289
zend_hash_num_elements (stmt -> bound_params ) :
@@ -236,22 +292,54 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
236
292
S -> param_lengths ,
237
293
S -> param_formats ,
238
294
0 );
295
+ }
239
296
} else if (stmt -> supports_placeholders == PDO_PLACEHOLDER_NAMED ) {
240
297
/* execute query with parameters */
298
+ if (S -> is_unbuffered ) {
299
+ dispatch_result = PQsendQueryParams (H -> server , ZSTR_VAL (S -> query ),
300
+ stmt -> bound_params ? zend_hash_num_elements (stmt -> bound_params ) : 0 ,
301
+ S -> param_types ,
302
+ (const char * * )S -> param_values ,
303
+ S -> param_lengths ,
304
+ S -> param_formats ,
305
+ 0 );
306
+ } else {
241
307
S -> result = PQexecParams (H -> server , ZSTR_VAL (S -> query ),
242
308
stmt -> bound_params ? zend_hash_num_elements (stmt -> bound_params ) : 0 ,
243
309
S -> param_types ,
244
310
(const char * * )S -> param_values ,
245
311
S -> param_lengths ,
246
312
S -> param_formats ,
247
313
0 );
314
+ }
248
315
} else {
249
316
/* execute plain query (with embedded parameters) */
317
+ if (S -> is_unbuffered ) {
318
+ dispatch_result = PQsendQuery (H -> server , ZSTR_VAL (stmt -> active_query_string ));
319
+ } else {
250
320
S -> result = PQexec (H -> server , ZSTR_VAL (stmt -> active_query_string ));
321
+ }
251
322
}
323
+
324
+ H -> running_stmt = S ;
325
+
326
+ if (S -> is_unbuffered ) {
327
+ if (!dispatch_result ) {
328
+ pdo_pgsql_error_stmt (stmt , 0 , NULL );
329
+ H -> running_stmt = NULL ;
330
+ return 0 ;
331
+ }
332
+ S -> is_running_unbuffered = true;
333
+ (void )PQsetSingleRowMode (H -> server );
334
+ /* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */
335
+
336
+ /* try a first fetch to at least have column names and so on */
337
+ S -> result = PQgetResult (S -> H -> server );
338
+ }
339
+
252
340
status = PQresultStatus (S -> result );
253
341
254
- if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK ) {
342
+ if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE ) {
255
343
pdo_pgsql_error_stmt (stmt , status , pdo_pgsql_sqlstate (S -> result ));
256
344
return 0 ;
257
345
}
@@ -473,6 +561,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt,
473
561
return 0 ;
474
562
}
475
563
} else {
564
+ if (S -> is_running_unbuffered && S -> current_row >= stmt -> row_count ) {
565
+ ExecStatusType status ;
566
+
567
+ /* @todo in unbuffered mode, PQ allows multiple queries to be passed:
568
+ * column_count should be recomputed on each iteration */
569
+
570
+ if (S -> result ) {
571
+ PQclear (S -> result );
572
+ S -> result = NULL ;
573
+ }
574
+
575
+ S -> result = PQgetResult (S -> H -> server );
576
+ status = PQresultStatus (S -> result );
577
+
578
+ if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE ) {
579
+ pdo_pgsql_error_stmt (stmt , status , pdo_pgsql_sqlstate (S -> result ));
580
+ return 0 ;
581
+ }
582
+
583
+ stmt -> row_count = (zend_long )PQntuples (S -> result );
584
+ S -> current_row = 0 ;
585
+
586
+ if (!stmt -> row_count ) {
587
+ S -> is_running_unbuffered = false;
588
+ /* libpq requires looping until getResult returns null */
589
+ pgsql_stmt_finish (S , 0 );
590
+ }
591
+ }
476
592
if (S -> current_row < stmt -> row_count ) {
477
593
S -> current_row ++ ;
478
594
return 1 ;
0 commit comments