Skip to content

Commit cd4c81f

Browse files
authored
DOCSP-41981: Change streams (#113)
Adds a Change Streams guide.
1 parent 7e0f9be commit cd4c81f

File tree

3 files changed

+347
-0
lines changed

3 files changed

+347
-0
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
<?php
2+
require 'vendor/autoload.php';
3+
4+
// start-to-json
5+
function toJSON(object $document): string
6+
{
7+
return MongoDB\BSON\Document::fromPHP($document)->toRelaxedExtendedJSON();
8+
}
9+
// end-to-json
10+
11+
$uri = getenv('MONGODB_URI') ?: throw new RuntimeException('Set the MONGODB_URI variable to your Atlas URI that connects to the sample dataset');
12+
$client = new MongoDB\Client($uri);
13+
14+
// start-db-coll
15+
$collection = $client->sample_restaurants->restaurants;
16+
// end-db-coll
17+
18+
// Monitors and prints changes to the "restaurants" collection
19+
// start-open-change-stream
20+
$changeStream = $collection->watch();
21+
22+
for ($changeStream->rewind(); true; $changeStream->next()) {
23+
if ( ! $changeStream->valid()) {
24+
continue;
25+
}
26+
$event = $changeStream->current();
27+
echo toJSON($event) . PHP_EOL;
28+
29+
if ($event['operationType'] === 'invalidate') {
30+
break;
31+
}
32+
}
33+
// end-open-change-stream
34+
35+
// Updates a document that has a "name" value of "Blarney Castle"
36+
// start-update-for-change-stream
37+
$result = $collection->updateOne(
38+
['name' => 'Blarney Castle'],
39+
['$set' => ['cuisine' => 'Irish']]
40+
);
41+
// end-update-for-change-stream
42+
43+
// Passes a pipeline argument to watch() to monitor only update operations
44+
// start-change-stream-pipeline
45+
$pipeline = [['$match' => ['operationType' => 'update']]];
46+
$changeStream = $collection->watch($pipeline);
47+
48+
for ($changeStream->rewind(); true; $changeStream->next()) {
49+
if ( ! $changeStream->valid()) {
50+
continue;
51+
}
52+
$event = $changeStream->current();
53+
echo toJSON($event) . PHP_EOL;
54+
55+
if ($event['operationType'] === 'invalidate') {
56+
break;
57+
}
58+
}
59+
// end-change-stream-pipeline
60+
61+
// Passes an options argument to watch() to include the post-image of updated documents
62+
// start-change-stream-post-image
63+
$options = ['fullDocument' => MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP];
64+
$changeStream = $collection->watch([], $options);
65+
66+
for ($changeStream->rewind(); true; $changeStream->next()) {
67+
if ( ! $changeStream->valid()) {
68+
continue;
69+
}
70+
$event = $changeStream->current();
71+
echo toJSON($event) . PHP_EOL;
72+
73+
if ($event['operationType'] === 'invalidate') {
74+
break;
75+
}
76+
}
77+
// end-change-stream-post-image
78+

source/read.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ Read Data from MongoDB
1414
/read/specify-documents-to-return
1515
/read/specify-a-query
1616
/read/distinct
17+
/read/change-streams

source/read/change-streams.txt

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
.. _php-change-streams:
2+
3+
====================
4+
Monitor Data Changes
5+
====================
6+
7+
.. contents:: On this page
8+
:local:
9+
:backlinks: none
10+
:depth: 2
11+
:class: singlecol
12+
13+
.. facet::
14+
:name: genre
15+
:values: reference
16+
17+
.. meta::
18+
:keywords: watch, code example
19+
20+
Overview
21+
--------
22+
23+
In this guide, you can learn how to use a **change stream** to monitor real-time
24+
changes to your data. A change stream is a {+mdb-server+} feature that
25+
allows your application to subscribe to data changes on a collection, database,
26+
or deployment.
27+
28+
When using the {+php-library+}, you can call the ``watch()`` method to return an
29+
instance of ``MongoDB\ChangeStream``. Then, you can iterate through the
30+
``MongoDB\ChangeStream`` instance to monitor data changes, such as updates,
31+
insertions, and deletions.
32+
33+
Sample Data
34+
~~~~~~~~~~~
35+
36+
The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants``
37+
database from the :atlas:`Atlas sample datasets </sample-data>`. To access this collection
38+
from your PHP application, instantiate a ``MongoDB\Client`` that connects to an Atlas cluster
39+
and assign the following value to your ``$collection`` variable:
40+
41+
.. literalinclude:: /includes/read/change-streams.php
42+
:language: php
43+
:dedent:
44+
:start-after: start-db-coll
45+
:end-before: end-db-coll
46+
47+
.. tip::
48+
49+
To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the
50+
:atlas:`Get Started with Atlas </getting-started>` guide.
51+
52+
Some examples use the ``toJSON()`` function to represent change events, which are BSON
53+
documents, as Extended JSON. To use this function, paste the following code into your
54+
application file:
55+
56+
.. literalinclude:: /includes/read/change-streams.php
57+
:language: php
58+
:dedent:
59+
:start-after: start-to-json
60+
:end-before: end-to-json
61+
62+
Open a Change Stream
63+
--------------------
64+
65+
To open a change stream, call the ``watch()`` method. The instance on which you
66+
call the ``watch()`` method determines the scope of events that the change
67+
stream monitors. You can call the ``watch()`` method on instances of the following
68+
classes:
69+
70+
- ``MongoDB\Client``: Monitor all changes in the MongoDB deployment
71+
- ``MongoDB\Database``: Monitor changes in all collections in the database
72+
- ``MongoDB\Collection``: Monitor changes in the collection
73+
74+
The following example opens a change stream on the ``restaurants`` collection
75+
and outputs changes as they occur:
76+
77+
.. literalinclude:: /includes/read/change-streams.php
78+
:start-after: start-open-change-stream
79+
:end-before: end-open-change-stream
80+
:language: php
81+
:dedent:
82+
83+
To begin watching for changes, run the preceding code. Then, in a separate
84+
shell, modify the ``restaurants`` collection. The following example updates
85+
a document that has a ``name`` field value of ``'Blarney Castle'``:
86+
87+
.. _php-change-stream-update:
88+
89+
.. literalinclude:: /includes/read/change-streams.php
90+
:start-after: start-update-for-change-stream
91+
:end-before: end-update-for-change-stream
92+
:language: php
93+
:dedent:
94+
95+
When you update the collection, the change stream application prints the change
96+
as it occurs. The printed change event resembles the following output:
97+
98+
.. code-block:: none
99+
:copyable: false
100+
101+
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" :
102+
{ "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, "ns" : { "db" :
103+
"sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" :
104+
{ "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
105+
"removedFields" : [ ], "truncatedArrays" : [ ] } }
106+
107+
Modify the Change Stream Output
108+
-------------------------------
109+
110+
To modify the change stream output, you can pass pipeline stages in an array as a
111+
parameter to the ``watch()`` method. You can include the following stages in the
112+
array:
113+
114+
- ``$addFields`` or ``$set``: Adds new fields to documents
115+
- ``$match``: Filters the documents
116+
- ``$project``: Projects a subset of the document fields
117+
- ``$replaceWith`` or ``$replaceRoot``: Replaces the input document with the
118+
specified document
119+
- ``$redact``: Restricts the contents of the documents
120+
- ``$unset``: Removes fields from documents
121+
122+
The following example passes a pipeline that includes the ``$match`` stage to the
123+
``watch()`` method. This instructs the ``watch()`` method to output events only
124+
when update operations occur:
125+
126+
.. literalinclude:: /includes/read/change-streams.php
127+
:start-after: start-change-stream-pipeline
128+
:end-before: end-change-stream-pipeline
129+
:language: php
130+
:dedent:
131+
132+
Modify ``watch()`` Behavior
133+
---------------------------
134+
135+
To modify the behavior of the ``watch()`` method, you can pass an options array
136+
as a parameter to ``watch()``. The following table describes useful options you
137+
can set in the array:
138+
139+
.. list-table::
140+
:widths: 30 70
141+
:header-rows: 1
142+
143+
* - Option
144+
- Description
145+
146+
* - ``fullDocument``
147+
- | Specifies whether to show the full document after the change, rather
148+
than showing only the changes made to the document. To learn more about
149+
this option, see the :ref:`php-change-stream-pre-post-image` section of this
150+
guide.
151+
152+
* - ``fullDocumentBeforeChange``
153+
- | Specifies whether to show the full document as it was before the change, rather
154+
than showing only the changes made to the document. To learn more about
155+
this option, see :ref:`php-change-stream-pre-post-image`.
156+
157+
* - ``startAfter``
158+
- | Instructs ``watch()`` to start a new change stream after the
159+
operation specified in the resume token. This field allows notifications to
160+
resume after an invalidate event.
161+
| Each change stream event document includes a resume token as the ``_id``
162+
field. Pass the entire ``_id`` field of the change event document that
163+
represents the operation you want to resume after.
164+
| This option is mutually exclusive with ``resumeAfter`` and ``startAtOperationTime``.
165+
166+
* - ``startAtOperationTime``
167+
- | Instructs the change stream to only provide changes that occurred at or after
168+
the specified timestamp.
169+
| This option is mutually exclusive with ``startAfter`` and ``resumeAfter``.
170+
171+
* - ``collation``
172+
- | Sets the collation to use for the change stream cursor.
173+
174+
For a full list of ``watch()`` options, see `MongoDB\\Collection::watch()
175+
<{+api+}/method/MongoDBCollection-watch/>`__ in the API
176+
documentation.
177+
178+
.. _php-change-stream-pre-post-image:
179+
180+
Include Pre-Images and Post-Images
181+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
182+
183+
.. important::
184+
185+
You can enable pre-images and post-images on collections only if your
186+
deployment uses MongoDB v6.0 or later.
187+
188+
By default, when you perform an operation on a collection, the
189+
corresponding change event includes only the delta of the fields
190+
modified by that operation. To see the full document before or after a
191+
change, specify the ``fullDocumentBeforeChange`` or the ``fullDocument``
192+
options in an array parameter to ``watch()``.
193+
194+
The **pre-image** is the full version of a document *before* a change. To include the
195+
pre-image in the change stream event, set the ``fullDocumentBeforeChange`` option
196+
to one of the following values:
197+
198+
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE``: The change event includes
199+
a pre-image of the modified document for change events. If the pre-image is not available, this
200+
change event field has a ``null`` value.
201+
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED``: The change event includes a pre-image
202+
of the modified document for change events. If the pre-image is not available, the
203+
server raises an error.
204+
205+
The **post-image** is the full version of a document *after* a change. To include the
206+
post-image in the change stream event, set the ``fullDocument`` option to
207+
one of the following values:
208+
209+
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP``: The change event includes a
210+
copy of the entire changed document from some time after the change.
211+
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLE``: The change event includes
212+
a post-image of the modified document for change events. If the post-image is not
213+
available, this change event field has a ``null`` value.
214+
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIRED``: The change event includes a post-image
215+
of the modified document for change events. If the post-image is not available, the
216+
server raises an error.
217+
218+
The following example calls the ``watch()`` method on a collection and includes the post-image
219+
of updated documents by setting the ``fullDocument`` option:
220+
221+
.. literalinclude:: /includes/read/change-streams.php
222+
:start-after: start-change-stream-post-image
223+
:end-before: end-change-stream-post-image
224+
:language: php
225+
:dedent:
226+
227+
With the change stream application running in a separate shell, updating a
228+
document in the ``restaurants`` collection by using the :ref:`preceding update
229+
example <php-change-stream-update>` prints a change event resembling the following
230+
output:
231+
232+
.. code-block:: none
233+
:copyable: false
234+
:emphasize-lines: 3-6
235+
236+
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" :
237+
{ "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." },
238+
"fullDocument" : { "_id" : { "$oid" : "..." }, "address" : { "building" :
239+
"202-24", "coord" : [ -73.925044200000002093, 40.559546199999999772 ], "street"
240+
: "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens",
241+
"cuisine" : "Irish", "grades" : [ ...], "name" : "Blarney Castle", "restaurant_id" :
242+
"40366356" }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
243+
"documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" :
244+
{ "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ],
245+
"truncatedArrays" : [ ] } }
246+
247+
.. tip::
248+
249+
To learn more about pre-images and post-images, see
250+
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>`
251+
in the {+mdb-server+} manual.
252+
253+
Additional Information
254+
----------------------
255+
256+
To learn more about change streams, see :manual:`Change Streams
257+
</changeStreams>` in the {+mdb-server+} manual.
258+
259+
API Documentation
260+
~~~~~~~~~~~~~~~~~
261+
262+
To learn more about any of the methods or types discussed in this
263+
guide, see the following API documentation:
264+
265+
- `MongoDB\\Client::watch() <{+api+}/method/MongoDBClient-watch/>`__
266+
- `MongoDB\\Database::watch() <{+api+}/method/MongoDBDatabase-watch/>`__
267+
- `MongoDB\\Collection::watch() <{+api+}/method/MongoDBCollection-watch/>`__
268+
- `MongoDB\\Collection::updateOne() <{+api+}/method/MongoDBCollection-updateOne/>`__

0 commit comments

Comments
 (0)