Skip to content

Commit 4c33808

Browse files
committed
PHPC-1029: Support maxTimeMS getMore option for tailable command cursors
1 parent aa1eb0b commit 4c33808

File tree

4 files changed

+143
-9
lines changed

4 files changed

+143
-9
lines changed

php_phongo.c

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,18 @@ int phongo_execute_command(mongoc_client_t *client, const char *db, zval *zcomma
654654
/* According to mongoc_cursor_new_from_command_reply(), the reply bson_t
655655
* is ultimately destroyed on both success and failure. */
656656
if (bson_iter_init_find(&iter, &reply, "cursor") && BSON_ITER_HOLDS_DOCUMENT(&iter)) {
657-
cmd_cursor = mongoc_cursor_new_from_command_reply(client, &reply, selected_server_id);
657+
bson_t initial_reply = BSON_INITIALIZER;
658+
659+
bson_copy_to(&reply, &initial_reply);
660+
661+
if (command->max_await_time_ms) {
662+
bson_append_bool(&initial_reply, "awaitData", -1, 1);
663+
bson_append_int32(&initial_reply, "maxAwaitTimeMS", -1, command->max_await_time_ms);
664+
bson_append_bool(&initial_reply, "tailable", -1, 1);
665+
}
666+
667+
cmd_cursor = mongoc_cursor_new_from_command_reply(client, &initial_reply, selected_server_id);
668+
bson_destroy(&reply);
658669
} else {
659670
bson_t *wrapped_reply = create_wrapped_command_envelope(db, &reply);
660671

@@ -663,7 +674,7 @@ int phongo_execute_command(mongoc_client_t *client, const char *db, zval *zcomma
663674
}
664675

665676
if (!phongo_advance_cursor_and_check_for_error(cmd_cursor TSRMLS_CC)) {
666-
mongoc_cursor_destroy(cmd_cursor);
677+
/* If an error is found, the cmd_cursor is destroyed already */
667678
return false;
668679
}
669680

php_phongo_structs.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ typedef struct {
4545

4646
typedef struct {
4747
PHONGO_ZEND_OBJECT_PRE
48-
bson_t *bson;
48+
bson_t *bson;
49+
uint32_t max_await_time_ms;
4950
PHONGO_ZEND_OBJECT_POST
5051
} php_phongo_command_t;
5152

src/MongoDB/Command.c

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,35 +21,86 @@
2121
#include <php.h>
2222
#include <Zend/zend_interfaces.h>
2323

24+
#include "php_array_api.h"
2425
#include "phongo_compat.h"
2526
#include "php_phongo.h"
2627
#include "php_bson.h"
2728

2829
zend_class_entry *php_phongo_command_ce;
2930

30-
/* {{{ proto void MongoDB\Driver\Command::__construct(array|object $document)
31+
/* Initialize the "maxAwaitTimeMS" option. Returns true on success; otherwise,
32+
* false is returned and an exception is thrown.
33+
*
34+
* The "maxAwaitTimeMS" option is assigned to the cursor after query execution
35+
* via mongoc_cursor_set_max_await_time_ms(). */
36+
static bool php_phongo_command_init_max_await_time_ms(php_phongo_command_t *intern, zval *options TSRMLS_DC) /* {{{ */
37+
{
38+
if (php_array_existsc(options, "maxAwaitTimeMS")) {
39+
int64_t max_await_time_ms = php_array_fetchc_long(options, "maxAwaitTimeMS");
40+
41+
if (max_await_time_ms < 0) {
42+
phongo_throw_exception(PHONGO_ERROR_INVALID_ARGUMENT TSRMLS_CC, "Expected \"maxAwaitTimeMS\" option to be >= 0, %" PRId64 " given", max_await_time_ms);
43+
return false;
44+
}
45+
46+
if (max_await_time_ms > UINT32_MAX) {
47+
phongo_throw_exception(PHONGO_ERROR_INVALID_ARGUMENT TSRMLS_CC, "Expected \"maxAwaitTimeMS\" option to be <= %" PRIu32 ", %" PRId64 " given", UINT32_MAX, max_await_time_ms);
48+
return false;
49+
}
50+
51+
intern->max_await_time_ms = (uint32_t) max_await_time_ms;
52+
}
53+
54+
return true;
55+
} /* }}} */
56+
57+
/* Initializes the php_phongo_command_init from options argument. This
58+
* function will fall back to a modifier in the absence of a top-level option
59+
* (where applicable). */
60+
static bool php_phongo_command_init(php_phongo_command_t *intern, zval *filter, zval *options TSRMLS_DC) /* {{{ */
61+
{
62+
intern->bson = bson_new();
63+
64+
php_phongo_zval_to_bson(filter, PHONGO_BSON_NONE, intern->bson, NULL TSRMLS_CC);
65+
66+
/* Note: if any exceptions are thrown, we can simply return as PHP will
67+
* invoke php_phongo_query_free_object to destruct the object. */
68+
if (EG(exception)) {
69+
return false;
70+
}
71+
72+
if (!options) {
73+
return true;
74+
}
75+
76+
if (!php_phongo_command_init_max_await_time_ms(intern, options TSRMLS_CC)) {
77+
return false;
78+
}
79+
80+
return true;
81+
} /* }}} */
82+
83+
/* {{{ proto void MongoDB\Driver\Command::__construct(array|object $document[, array $options = array()])
3184
Constructs a new Command */
3285
static PHP_METHOD(Command, __construct)
3386
{
3487
php_phongo_command_t *intern;
3588
zend_error_handling error_handling;
3689
zval *document;
37-
bson_t *bson = bson_new();
90+
zval *options = NULL;
3891
SUPPRESS_UNUSED_WARNING(return_value) SUPPRESS_UNUSED_WARNING(return_value_ptr) SUPPRESS_UNUSED_WARNING(return_value_used)
3992

4093

4194
zend_replace_error_handling(EH_THROW, phongo_exception_from_phongo_domain(PHONGO_ERROR_INVALID_ARGUMENT), &error_handling TSRMLS_CC);
4295
intern = Z_COMMAND_OBJ_P(getThis());
4396

44-
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "A", &document) == FAILURE) {
97+
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "A|a!", &document, &options) == FAILURE) {
4598
zend_restore_error_handling(&error_handling TSRMLS_CC);
4699
return;
47100
}
48101
zend_restore_error_handling(&error_handling TSRMLS_CC);
49102

50-
51-
php_phongo_zval_to_bson(document, PHONGO_BSON_NONE, bson, NULL TSRMLS_CC);
52-
intern->bson = bson;
103+
php_phongo_command_init(intern, document, options TSRMLS_CC);
53104
} /* }}} */
54105

55106
/* {{{ MongoDB\Driver\Command function entries */
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
--TEST--
2+
MongoDB\Driver\Cursor tailable iteration with awaitData and maxAwaitTimeMS options
3+
--SKIPIF--
4+
<?php require __DIR__ . "/../utils/basic-skipif.inc"; ?>
5+
<?php NEEDS('REPLICASET_36'); CLEANUP(REPLICASET_36); ?>
6+
--FILE--
7+
<?php
8+
require_once __DIR__ . "/../utils/basic.inc";
9+
$manager = new MongoDB\Driver\Manager(REPLICASET_36);
10+
11+
$manager->executeCommand(DATABASE_NAME, new MongoDB\Driver\Command([
12+
'create' => COLLECTION_NAME,
13+
'capped' => true,
14+
'size' => 1048576,
15+
]));
16+
17+
$bulkWrite = new MongoDB\Driver\BulkWrite;
18+
$bulkWrite->insert(['_id' => 1]);
19+
$manager->executeBulkWrite(NS, $bulkWrite);
20+
21+
$pipeline = [
22+
[ '$changeStream' => [ 'fullDocument' => 'updateLookup' ] ]
23+
];
24+
25+
$command = new MongoDB\Driver\Command([
26+
'aggregate' => COLLECTION_NAME,
27+
'pipeline' => $pipeline,
28+
'cursor' => ['batchSize' => 0],
29+
], [
30+
'tailable' => true,
31+
'awaitData' => true,
32+
'maxAwaitTimeMS' => 100,
33+
]);
34+
35+
$cursor = $manager->executeCommand(DATABASE_NAME, $command);
36+
$it = new IteratorIterator($cursor);
37+
38+
$it->rewind();
39+
$it->next();
40+
41+
$bulkWrite = new MongoDB\Driver\BulkWrite;
42+
$bulkWrite->insert(['_id' => "new-document"]);
43+
$manager->executeBulkWrite(NS, $bulkWrite);
44+
45+
$startTime = microtime(true);
46+
echo "Awaiting results...\n";
47+
$it->next();
48+
var_dump($it->current()->operationType, $it->current()->documentKey);
49+
printf("Waited for %.6f seconds\n", microtime(true) - $startTime);
50+
51+
$startTime = microtime(true);
52+
echo "Awaiting results...\n";
53+
$it->next();
54+
var_dump($it->current()); /* Should be NULL */
55+
printf("Waited for %.6f seconds\n", microtime(true) - $startTime);
56+
57+
?>
58+
===DONE===
59+
<?php exit(0); ?>
60+
--EXPECTF--
61+
Awaiting results...
62+
string(6) "insert"
63+
object(stdClass)#%d (%d) {
64+
["_id"]=>
65+
string(12) "new-document"
66+
}
67+
Waited for 0.%d seconds
68+
Awaiting results...
69+
NULL
70+
Waited for 0.1%d seconds
71+
===DONE===

0 commit comments

Comments
 (0)