diff --git a/source/includes/read/change-streams.php b/source/includes/read/change-streams.php new file mode 100644 index 00000000..5b51d1a6 --- /dev/null +++ b/source/includes/read/change-streams.php @@ -0,0 +1,78 @@ +toRelaxedExtendedJSON(); +} +// end-to-json + +$uri = getenv('MONGODB_URI') ?: throw new RuntimeException('Set the MONGODB_URI variable to your Atlas URI that connects to the sample dataset'); +$client = new MongoDB\Client($uri); + +// start-db-coll +$collection = $client->sample_restaurants->restaurants; +// end-db-coll + +// Monitors and prints changes to the "restaurants" collection +// start-open-change-stream +$changeStream = $collection->watch(); + +for ($changeStream->rewind(); true; $changeStream->next()) { + if ( ! $changeStream->valid()) { + continue; + } + $event = $changeStream->current(); + echo toJSON($event) . PHP_EOL; + + if ($event['operationType'] === 'invalidate') { + break; + } +} +// end-open-change-stream + +// Updates a document that has a "name" value of "Blarney Castle" +// start-update-for-change-stream +$result = $collection->updateOne( + ['name' => 'Blarney Castle'], + ['$set' => ['cuisine' => 'Irish']] +); +// end-update-for-change-stream + +// Passes a pipeline argument to watch() to monitor only update operations +// start-change-stream-pipeline +$pipeline = [['$match' => ['operationType' => 'update']]]; +$changeStream = $collection->watch($pipeline); + +for ($changeStream->rewind(); true; $changeStream->next()) { + if ( ! $changeStream->valid()) { + continue; + } + $event = $changeStream->current(); + echo toJSON($event) . PHP_EOL; + + if ($event['operationType'] === 'invalidate') { + break; + } +} +// end-change-stream-pipeline + +// Passes an options argument to watch() to include the post-image of updated documents +// start-change-stream-post-image +$options = ['fullDocument' => MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]; +$changeStream = $collection->watch([], $options); + +for ($changeStream->rewind(); true; $changeStream->next()) { + if ( ! $changeStream->valid()) { + continue; + } + $event = $changeStream->current(); + echo toJSON($event) . PHP_EOL; + + if ($event['operationType'] === 'invalidate') { + break; + } +} +// end-change-stream-post-image + diff --git a/source/read.txt b/source/read.txt index d0299830..a45c2a82 100644 --- a/source/read.txt +++ b/source/read.txt @@ -14,3 +14,4 @@ Read Data from MongoDB /read/specify-documents-to-return /read/specify-a-query /read/distinct + /read/change-streams diff --git a/source/read/change-streams.txt b/source/read/change-streams.txt new file mode 100644 index 00000000..757c1e7b --- /dev/null +++ b/source/read/change-streams.txt @@ -0,0 +1,268 @@ +.. _php-change-streams: + +==================== +Monitor Data Changes +==================== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 2 + :class: singlecol + +.. facet:: + :name: genre + :values: reference + +.. meta:: + :keywords: watch, code example + +Overview +-------- + +In this guide, you can learn how to use a **change stream** to monitor real-time +changes to your data. A change stream is a {+mdb-server+} feature that +allows your application to subscribe to data changes on a collection, database, +or deployment. + +When using the {+php-library+}, you can call the ``watch()`` method to return an +instance of ``MongoDB\ChangeStream``. Then, you can iterate through the +``MongoDB\ChangeStream`` instance to monitor data changes, such as updates, +insertions, and deletions. + +Sample Data +~~~~~~~~~~~ + +The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants`` +database from the :atlas:`Atlas sample datasets `. To access this collection +from your PHP application, instantiate a ``MongoDB\Client`` that connects to an Atlas cluster +and assign the following value to your ``$collection`` variable: + +.. literalinclude:: /includes/read/change-streams.php + :language: php + :dedent: + :start-after: start-db-coll + :end-before: end-db-coll + +.. tip:: + + To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the + :atlas:`Get Started with Atlas ` guide. + +Some examples use the ``toJSON()`` function to represent change events, which are BSON +documents, as Extended JSON. To use this function, paste the following code into your +application file: + +.. literalinclude:: /includes/read/change-streams.php + :language: php + :dedent: + :start-after: start-to-json + :end-before: end-to-json + +Open a Change Stream +-------------------- + +To open a change stream, call the ``watch()`` method. The instance on which you +call the ``watch()`` method determines the scope of events that the change +stream monitors. You can call the ``watch()`` method on instances of the following +classes: + +- ``MongoDB\Client``: Monitor all changes in the MongoDB deployment +- ``MongoDB\Database``: Monitor changes in all collections in the database +- ``MongoDB\Collection``: Monitor changes in the collection + +The following example opens a change stream on the ``restaurants`` collection +and outputs changes as they occur: + +.. literalinclude:: /includes/read/change-streams.php + :start-after: start-open-change-stream + :end-before: end-open-change-stream + :language: php + :dedent: + +To begin watching for changes, run the preceding code. Then, in a separate +shell, modify the ``restaurants`` collection. The following example updates +a document that has a ``name`` field value of ``'Blarney Castle'``: + +.. _php-change-stream-update: + +.. literalinclude:: /includes/read/change-streams.php + :start-after: start-update-for-change-stream + :end-before: end-update-for-change-stream + :language: php + :dedent: + +When you update the collection, the change stream application prints the change +as it occurs. The printed change event resembles the following output: + +.. code-block:: none + :copyable: false + + { "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : + { "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, "ns" : { "db" : + "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : + { "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, + "removedFields" : [ ], "truncatedArrays" : [ ] } } + +Modify the Change Stream Output +------------------------------- + +To modify the change stream output, you can pass pipeline stages in an array as a +parameter to the ``watch()`` method. You can include the following stages in the +array: + +- ``$addFields`` or ``$set``: Adds new fields to documents +- ``$match``: Filters the documents +- ``$project``: Projects a subset of the document fields +- ``$replaceWith`` or ``$replaceRoot``: Replaces the input document with the + specified document +- ``$redact``: Restricts the contents of the documents +- ``$unset``: Removes fields from documents + +The following example passes a pipeline that includes the ``$match`` stage to the +``watch()`` method. This instructs the ``watch()`` method to output events only +when update operations occur: + +.. literalinclude:: /includes/read/change-streams.php + :start-after: start-change-stream-pipeline + :end-before: end-change-stream-pipeline + :language: php + :dedent: + +Modify ``watch()`` Behavior +--------------------------- + +To modify the behavior of the ``watch()`` method, you can pass an options array +as a parameter to ``watch()``. The following table describes useful options you +can set in the array: + +.. list-table:: + :widths: 30 70 + :header-rows: 1 + + * - Option + - Description + + * - ``fullDocument`` + - | Specifies whether to show the full document after the change, rather + than showing only the changes made to the document. To learn more about + this option, see the :ref:`php-change-stream-pre-post-image` section of this + guide. + + * - ``fullDocumentBeforeChange`` + - | Specifies whether to show the full document as it was before the change, rather + than showing only the changes made to the document. To learn more about + this option, see :ref:`php-change-stream-pre-post-image`. + + * - ``startAfter`` + - | Instructs ``watch()`` to start a new change stream after the + operation specified in the resume token. This field allows notifications to + resume after an invalidate event. + | Each change stream event document includes a resume token as the ``_id`` + field. Pass the entire ``_id`` field of the change event document that + represents the operation you want to resume after. + | This option is mutually exclusive with ``resumeAfter`` and ``startAtOperationTime``. + + * - ``startAtOperationTime`` + - | Instructs the change stream to only provide changes that occurred at or after + the specified timestamp. + | This option is mutually exclusive with ``startAfter`` and ``resumeAfter``. + + * - ``collation`` + - | Sets the collation to use for the change stream cursor. + +For a full list of ``watch()`` options, see `MongoDB\\Collection::watch() +<{+api+}/method/MongoDBCollection-watch/>`__ in the API +documentation. + +.. _php-change-stream-pre-post-image: + +Include Pre-Images and Post-Images +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. important:: + + You can enable pre-images and post-images on collections only if your + deployment uses MongoDB v6.0 or later. + +By default, when you perform an operation on a collection, the +corresponding change event includes only the delta of the fields +modified by that operation. To see the full document before or after a +change, specify the ``fullDocumentBeforeChange`` or the ``fullDocument`` +options in an array parameter to ``watch()``. + +The **pre-image** is the full version of a document *before* a change. To include the +pre-image in the change stream event, set the ``fullDocumentBeforeChange`` option +to one of the following values: + +- ``MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE``: The change event includes + a pre-image of the modified document for change events. If the pre-image is not available, this + change event field has a ``null`` value. +- ``MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED``: The change event includes a pre-image + of the modified document for change events. If the pre-image is not available, the + server raises an error. + +The **post-image** is the full version of a document *after* a change. To include the +post-image in the change stream event, set the ``fullDocument`` option to +one of the following values: + +- ``MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP``: The change event includes a + copy of the entire changed document from some time after the change. +- ``MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLE``: The change event includes + a post-image of the modified document for change events. If the post-image is not + available, this change event field has a ``null`` value. +- ``MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIRED``: The change event includes a post-image + of the modified document for change events. If the post-image is not available, the + server raises an error. + +The following example calls the ``watch()`` method on a collection and includes the post-image +of updated documents by setting the ``fullDocument`` option: + +.. literalinclude:: /includes/read/change-streams.php + :start-after: start-change-stream-post-image + :end-before: end-change-stream-post-image + :language: php + :dedent: + +With the change stream application running in a separate shell, updating a +document in the ``restaurants`` collection by using the :ref:`preceding update +example ` prints a change event resembling the following +output: + +.. code-block:: none + :copyable: false + :emphasize-lines: 3-6 + + { "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : + { "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, + "fullDocument" : { "_id" : { "$oid" : "..." }, "address" : { "building" : + "202-24", "coord" : [ -73.925044200000002093, 40.559546199999999772 ], "street" + : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", + "cuisine" : "Irish", "grades" : [ ...], "name" : "Blarney Castle", "restaurant_id" : + "40366356" }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, + "documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" : + { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ], + "truncatedArrays" : [ ] } } + +.. tip:: + + To learn more about pre-images and post-images, see + :manual:`Change Streams with Document Pre- and Post-Images ` + in the {+mdb-server+} manual. + +Additional Information +---------------------- + +To learn more about change streams, see :manual:`Change Streams +` in the {+mdb-server+} manual. + +API Documentation +~~~~~~~~~~~~~~~~~ + +To learn more about any of the methods or types discussed in this +guide, see the following API documentation: + +- `MongoDB\\Client::watch() <{+api+}/method/MongoDBClient-watch/>`__ +- `MongoDB\\Database::watch() <{+api+}/method/MongoDBDatabase-watch/>`__ +- `MongoDB\\Collection::watch() <{+api+}/method/MongoDBCollection-watch/>`__ +- `MongoDB\\Collection::updateOne() <{+api+}/method/MongoDBCollection-updateOne/>`__ \ No newline at end of file