Skip to content

DOCSP-41981: Change streams #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions source/includes/read/change-streams.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<?php
require 'vendor/autoload.php';

// start-to-json
function toJSON(object $document): string
{
return MongoDB\BSON\Document::fromPHP($document)->toRelaxedExtendedJSON();
}
// end-to-json

$uri = getenv('MONGODB_URI') ?: throw new RuntimeException('Set the MONGODB_URI variable to your Atlas URI that connects to the sample dataset');
$client = new MongoDB\Client($uri);

// start-db-coll
$collection = $client->sample_restaurants->restaurants;
// end-db-coll

// Monitors and prints changes to the "restaurants" collection
// start-open-change-stream
$changeStream = $collection->watch();

for ($changeStream->rewind(); true; $changeStream->next()) {
if ( ! $changeStream->valid()) {
continue;
}
$event = $changeStream->current();
echo toJSON($event) . PHP_EOL;

if ($event['operationType'] === 'invalidate') {
break;
}
}
// end-open-change-stream

// Updates a document that has a "name" value of "Blarney Castle"
// start-update-for-change-stream
$result = $collection->updateOne(
['name' => 'Blarney Castle'],
['$set' => ['cuisine' => 'Irish']]
);
// end-update-for-change-stream

// Passes a pipeline argument to watch() to monitor only update operations
// start-change-stream-pipeline
$pipeline = [['$match' => ['operationType' => 'update']]];
$changeStream = $collection->watch($pipeline);

for ($changeStream->rewind(); true; $changeStream->next()) {
if ( ! $changeStream->valid()) {
continue;
}
$event = $changeStream->current();
echo toJSON($event) . PHP_EOL;

if ($event['operationType'] === 'invalidate') {
break;
}
}
// end-change-stream-pipeline

// Passes an options argument to watch() to include the post-image of updated documents
// start-change-stream-post-image
$options = ['fullDocument' => MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP];
$changeStream = $collection->watch([], $options);

for ($changeStream->rewind(); true; $changeStream->next()) {
if ( ! $changeStream->valid()) {
continue;
}
$event = $changeStream->current();
echo toJSON($event) . PHP_EOL;

if ($event['operationType'] === 'invalidate') {
break;
}
}
// end-change-stream-post-image

1 change: 1 addition & 0 deletions source/read.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ Read Data from MongoDB
/read/specify-documents-to-return
/read/specify-a-query
/read/distinct
/read/change-streams
268 changes: 268 additions & 0 deletions source/read/change-streams.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
.. _php-change-streams:

====================
Monitor Data Changes
====================

.. contents:: On this page
:local:
:backlinks: none
:depth: 2
:class: singlecol

.. facet::
:name: genre
:values: reference

.. meta::
:keywords: watch, code example

Overview
--------

In this guide, you can learn how to use a **change stream** to monitor real-time
changes to your data. A change stream is a {+mdb-server+} feature that
allows your application to subscribe to data changes on a collection, database,
or deployment.

When using the {+php-library+}, you can call the ``watch()`` method to return an
instance of ``MongoDB\ChangeStream``. Then, you can iterate through the
``MongoDB\ChangeStream`` instance to monitor data changes, such as updates,
insertions, and deletions.

Sample Data
~~~~~~~~~~~

The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants``
database from the :atlas:`Atlas sample datasets </sample-data>`. To access this collection
from your PHP application, instantiate a ``MongoDB\Client`` that connects to an Atlas cluster
and assign the following value to your ``$collection`` variable:

.. literalinclude:: /includes/read/change-streams.php
:language: php
:dedent:
:start-after: start-db-coll
:end-before: end-db-coll

.. tip::

To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the
:atlas:`Get Started with Atlas </getting-started>` guide.

Some examples use the ``toJSON()`` function to represent change events, which are BSON
documents, as Extended JSON. To use this function, paste the following code into your
application file:

.. literalinclude:: /includes/read/change-streams.php
:language: php
:dedent:
:start-after: start-to-json
:end-before: end-to-json

Open a Change Stream
--------------------

To open a change stream, call the ``watch()`` method. The instance on which you
call the ``watch()`` method determines the scope of events that the change
stream monitors. You can call the ``watch()`` method on instances of the following
classes:

- ``MongoDB\Client``: Monitor all changes in the MongoDB deployment
- ``MongoDB\Database``: Monitor changes in all collections in the database
- ``MongoDB\Collection``: Monitor changes in the collection

The following example opens a change stream on the ``restaurants`` collection
and outputs changes as they occur:

.. literalinclude:: /includes/read/change-streams.php
:start-after: start-open-change-stream
:end-before: end-open-change-stream
:language: php
:dedent:

To begin watching for changes, run the preceding code. Then, in a separate
shell, modify the ``restaurants`` collection. The following example updates
a document that has a ``name`` field value of ``'Blarney Castle'``:

.. _php-change-stream-update:

.. literalinclude:: /includes/read/change-streams.php
:start-after: start-update-for-change-stream
:end-before: end-update-for-change-stream
:language: php
:dedent:

When you update the collection, the change stream application prints the change
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that just occurred to me: should this tutorial advise users to run this PHP code through a shell/terminal? Users would likely run into issues trying to execute the watch() process through a web server.

I'm curious if there's language from other language tutorials (assuming this was adapted from existing content) we can use.

The most straightforward way to address this might be to consistently refer to "shell process" instead of "application or shell" for all of the code examples (no harm in using that for the update example as well).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use "application" pretty consistently across all docs, but I agree that it makes sense to suggest running the application from the shell. I updated all mentions of running the application on this page to clarify that

as it occurs. The printed change event resembles the following output:

.. code-block:: none
:copyable: false

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" :
{ "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, "ns" : { "db" :
"sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" :
{ "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [ ], "truncatedArrays" : [ ] } }

Modify the Change Stream Output
-------------------------------

To modify the change stream output, you can pass pipeline stages in an array as a
parameter to the ``watch()`` method. You can include the following stages in the
array:

- ``$addFields`` or ``$set``: Adds new fields to documents
- ``$match``: Filters the documents
- ``$project``: Projects a subset of the document fields
- ``$replaceWith`` or ``$replaceRoot``: Replaces the input document with the
specified document
- ``$redact``: Restricts the contents of the documents
- ``$unset``: Removes fields from documents

The following example passes a pipeline that includes the ``$match`` stage to the
``watch()`` method. This instructs the ``watch()`` method to output events only
when update operations occur:

.. literalinclude:: /includes/read/change-streams.php
:start-after: start-change-stream-pipeline
:end-before: end-change-stream-pipeline
:language: php
:dedent:

Modify ``watch()`` Behavior
---------------------------

To modify the behavior of the ``watch()`` method, you can pass an options array
as a parameter to ``watch()``. The following table describes useful options you
can set in the array:

.. list-table::
:widths: 30 70
:header-rows: 1

* - Option
- Description

* - ``fullDocument``
- | Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see the :ref:`php-change-stream-pre-post-image` section of this
guide.

* - ``fullDocumentBeforeChange``
- | Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see :ref:`php-change-stream-pre-post-image`.

* - ``startAfter``
- | Instructs ``watch()`` to start a new change stream after the
operation specified in the resume token. This field allows notifications to
resume after an invalidate event.
| Each change stream event document includes a resume token as the ``_id``
field. Pass the entire ``_id`` field of the change event document that
represents the operation you want to resume after.
| This option is mutually exclusive with ``resumeAfter`` and ``startAtOperationTime``.

* - ``startAtOperationTime``
- | Instructs the change stream to only provide changes that occurred at or after
the specified timestamp.
| This option is mutually exclusive with ``startAfter`` and ``resumeAfter``.

* - ``collation``
- | Sets the collation to use for the change stream cursor.

For a full list of ``watch()`` options, see `MongoDB\\Collection::watch()
<{+api+}/method/MongoDBCollection-watch/>`__ in the API
documentation.

.. _php-change-stream-pre-post-image:

Include Pre-Images and Post-Images
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. important::

You can enable pre-images and post-images on collections only if your
deployment uses MongoDB v6.0 or later.

By default, when you perform an operation on a collection, the
corresponding change event includes only the delta of the fields
modified by that operation. To see the full document before or after a
change, specify the ``fullDocumentBeforeChange`` or the ``fullDocument``
options in an array parameter to ``watch()``.

The **pre-image** is the full version of a document *before* a change. To include the
pre-image in the change stream event, set the ``fullDocumentBeforeChange`` option
to one of the following values:

- ``MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE``: The change event includes
a pre-image of the modified document for change events. If the pre-image is not available, this
change event field has a ``null`` value.
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED``: The change event includes a pre-image
of the modified document for change events. If the pre-image is not available, the
server raises an error.

The **post-image** is the full version of a document *after* a change. To include the
post-image in the change stream event, set the ``fullDocument`` option to
one of the following values:

- ``MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP``: The change event includes a
copy of the entire changed document from some time after the change.
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLE``: The change event includes
a post-image of the modified document for change events. If the post-image is not
available, this change event field has a ``null`` value.
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIRED``: The change event includes a post-image
of the modified document for change events. If the post-image is not available, the
server raises an error.

The following example calls the ``watch()`` method on a collection and includes the post-image
of updated documents by setting the ``fullDocument`` option:

.. literalinclude:: /includes/read/change-streams.php
:start-after: start-change-stream-post-image
:end-before: end-change-stream-post-image
:language: php
:dedent:

With the change stream application running in a separate shell, updating a
document in the ``restaurants`` collection by using the :ref:`preceding update
example <php-change-stream-update>` prints a change event resembling the following
output:

.. code-block:: none
:copyable: false
:emphasize-lines: 3-6

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" :
{ "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." },
"fullDocument" : { "_id" : { "$oid" : "..." }, "address" : { "building" :
"202-24", "coord" : [ -73.925044200000002093, 40.559546199999999772 ], "street"
: "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens",
"cuisine" : "Irish", "grades" : [ ...], "name" : "Blarney Castle", "restaurant_id" :
"40366356" }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" :
{ "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ],
"truncatedArrays" : [ ] } }

.. tip::

To learn more about pre-images and post-images, see
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>`
in the {+mdb-server+} manual.

Additional Information
----------------------

To learn more about change streams, see :manual:`Change Streams
</changeStreams>` in the {+mdb-server+} manual.

API Documentation
~~~~~~~~~~~~~~~~~

To learn more about any of the methods or types discussed in this
guide, see the following API documentation:

- `MongoDB\\Client::watch() <{+api+}/method/MongoDBClient-watch/>`__
- `MongoDB\\Database::watch() <{+api+}/method/MongoDBDatabase-watch/>`__
- `MongoDB\\Collection::watch() <{+api+}/method/MongoDBCollection-watch/>`__
- `MongoDB\\Collection::updateOne() <{+api+}/method/MongoDBCollection-updateOne/>`__
Loading