Skip to content

Commit 6885e67

Browse files
authored
DOCSP-45200: Change streams (#123)
* DOCSP-45200: Change streams * edits * JB feedback
1 parent 2b72e4c commit 6885e67

File tree

3 files changed

+306
-0
lines changed

3 files changed

+306
-0
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
require 'bundler/inline'
2+
3+
gemfile do
4+
source 'https://rubygems.org'
5+
gem 'mongo'
6+
end
7+
8+
uri = '<connection string>'
9+
10+
Mongo::Client.new(uri) do |client|
11+
# Accesses the database and collection
12+
# start-db-coll
13+
database = client.use('sample_restaurants')
14+
collection = database[:restaurants]
15+
# end-db-coll
16+
17+
# Monitors and prints changes to the "restaurants" collection
18+
# start-open-change-stream
19+
stream = collection.watch
20+
stream.each do |doc|
21+
puts doc
22+
break if doc['operationType'] == 'invalidate'
23+
end
24+
# end-open-change-stream
25+
26+
# Updates a document that has a 'name' value of 'Blarney Castle'
27+
# start-update-for-change-stream
28+
collection.update_one(
29+
{ 'name' => 'Blarney Castle' },
30+
{ '$set' => { 'cuisine' => 'Irish' } }
31+
)
32+
# end-update-for-change-stream
33+
34+
# Passes a pipeline argument to watch to monitor only update operations
35+
# start-change-stream-pipeline
36+
pipeline = [{ '$match' => { 'operationType' => 'update' } }]
37+
stream = collection.watch(pipeline)
38+
stream.each do |doc|
39+
puts doc
40+
break if doc['operationType'] == 'invalidate'
41+
end
42+
# end-change-stream-pipeline
43+
44+
# Passes an options argument to watch to include the post-image of updated documents
45+
# start-change-stream-post-image
46+
options = { full_document: 'updateLookup' }
47+
stream = collection.watch([], options)
48+
stream.each do |doc|
49+
puts doc
50+
break if doc['operationType'] == 'invalidate'
51+
end
52+
# end-change-stream-post-image
53+
end

source/read.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Read Data from MongoDB
2929
Distinct Field Values </read/distinct>
3030
Count Documents </read/count>
3131
Cursors </read/cursors>
32+
Monitor Changes </read/change-streams>
3233

3334
Overview
3435
--------

source/read/change-streams.txt

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
.. _ruby-change-streams:
2+
3+
====================
4+
Monitor Data Changes
5+
====================
6+
7+
.. contents:: On this page
8+
:local:
9+
:backlinks: none
10+
:depth: 2
11+
:class: singlecol
12+
13+
.. facet::
14+
:name: genre
15+
:values: reference
16+
17+
.. meta::
18+
:keywords: watch, code example
19+
20+
Overview
21+
--------
22+
23+
In this guide, you can learn how to use a **change stream** to monitor real-time
24+
changes to your data. A change stream is a {+mdb-server+} feature that
25+
allows your application to subscribe to data changes on a collection, database,
26+
or deployment.
27+
28+
When using the {+driver-short+}, you can call the ``watch`` method to return a
29+
``Mongo::Collection::View::ChangeStream`` object. Then, you can iterate through
30+
its content to monitor data changes, such as updates, insertions, and deletions.
31+
32+
Sample Data
33+
~~~~~~~~~~~
34+
35+
The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants``
36+
database from the :atlas:`Atlas sample datasets </sample-data>`. To access this collection
37+
from your {+language+} application, create a ``Mongo::Client`` object that connects to
38+
an Atlas cluster and assign the following values to your ``database`` and ``collection``
39+
variables:
40+
41+
.. literalinclude:: /includes/read/change-streams.rb
42+
:language: ruby
43+
:dedent:
44+
:start-after: start-db-coll
45+
:end-before: end-db-coll
46+
47+
To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the
48+
:atlas:`Get Started with Atlas </getting-started>` guide.
49+
50+
Open a Change Stream
51+
--------------------
52+
53+
To open a change stream, call the ``watch`` method. The object on which you
54+
call the ``watch`` method determines the scope of events that the change
55+
stream monitors. You can call the ``watch`` method on the following objects
56+
57+
- ``Mongo::Client``: Monitors changes to all collections across all databases
58+
in a deployment, excluding :manual:`system collections </reference/system-collections/>`
59+
or collections in the ``admin``, ``local``, and ``config`` databases
60+
- ``Mongo::Database``: Monitors changes to all collections in one database
61+
- ``Mongo::Collection``: Monitors changes to one collection
62+
63+
The following example opens a change stream on the ``restaurants`` collection
64+
and outputs changes as they occur:
65+
66+
.. literalinclude:: /includes/read/change-streams.rb
67+
:start-after: start-open-change-stream
68+
:end-before: end-open-change-stream
69+
:language: ruby
70+
:dedent:
71+
72+
To begin watching for changes, run the preceding code. Then, in a separate
73+
shell, modify the ``restaurants`` collection. The following example updates
74+
a document that has a ``name`` field value of ``'Blarney Castle'``:
75+
76+
.. _ruby-change-stream-update:
77+
78+
.. literalinclude:: /includes/read/change-streams.rb
79+
:start-after: start-update-for-change-stream
80+
:end-before: end-update-for-change-stream
81+
:language: ruby
82+
:dedent:
83+
84+
When you update the collection, the change stream application prints the change
85+
as it occurs. The printed change event resembles the following output:
86+
87+
.. code-block:: none
88+
:copyable: false
89+
90+
{"_id"=>{"_data"=>"..."}, "operationType"=>"update", "clusterTime"=>#<...>,
91+
"ns"=>{"db"=>"sample_restaurants", "coll"=>"restaurants"}, "documentKey"=>
92+
{"_id"=>BSON::ObjectId('...')}, "updateDescription"=>{"updatedFields"=>
93+
{"cuisine"=>"Irish"}, "removedFields"=>[], "truncatedArrays"=>[]}}
94+
95+
Modify the Change Stream Output
96+
-------------------------------
97+
98+
To modify the change stream output, you can pass pipeline stages in an array as a
99+
parameter to the ``watch`` method. You can include the following stages in the
100+
array:
101+
102+
- ``$addFields`` or ``$set``: Adds new fields to documents
103+
- ``$match``: Filters the documents
104+
- ``$project``: Projects a subset of the document fields
105+
- ``$replaceWith`` or ``$replaceRoot``: Replaces the input document with the
106+
specified document
107+
- ``$redact``: Restricts the contents of the documents
108+
- ``$unset``: Removes fields from documents
109+
110+
The following example passes a pipeline that includes the ``$match`` stage to the
111+
``watch`` method. This instructs the ``watch`` method to output events only
112+
when update operations occur:
113+
114+
.. literalinclude:: /includes/read/change-streams.rb
115+
:start-after: start-change-stream-pipeline
116+
:end-before: end-change-stream-pipeline
117+
:language: ruby
118+
:dedent:
119+
120+
Modify watch Behavior
121+
---------------------
122+
123+
To modify the behavior of the ``watch`` method, you can pass an options hash
124+
as a parameter to ``watch``. The following table describes some of the options that
125+
you can set:
126+
127+
.. list-table::
128+
:widths: 30 70
129+
:header-rows: 1
130+
131+
* - Option
132+
- Description
133+
134+
* - ``full_document``
135+
- | Specifies whether to show the full document after the change, rather
136+
than showing only the changes made to the document. To learn more about
137+
this option, see the :ref:`ruby-change-stream-pre-post-image` section of this
138+
guide.
139+
140+
* - ``full_document_before_change``
141+
- | Specifies whether to show the full document as it was before the change, rather
142+
than showing only the changes made to the document. To learn more about
143+
this option, see :ref:`ruby-change-stream-pre-post-image`.
144+
145+
* - ``resume_after``
146+
- | Specifies the logical starting point for the change stream.
147+
| This option is mutually exclusive with ``start_at_operation_time``.
148+
149+
* - ``start_at_operation_time``
150+
- | Instructs the change stream to only provide changes that occurred at or after
151+
the specified timestamp.
152+
| This option is mutually exclusive with ``resume_after``.
153+
154+
* - ``collation``
155+
- | Sets the collation to use for the change stream cursor.
156+
157+
For a full list of ``watch`` options, see `watch <{+api-root+}/Mongo/Collection.html#watch-instance_method>`__
158+
in the API documentation.
159+
160+
.. _ruby-change-stream-pre-post-image:
161+
162+
Include Pre-Images and Post-Images
163+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
164+
165+
.. important::
166+
167+
You can enable pre-images and post-images on collections only if your
168+
deployment uses MongoDB v6.0 or later.
169+
170+
By default, when you perform an operation on a collection, the corresponding
171+
change event includes only the modified fields and their values before and
172+
after the operation.
173+
174+
You can instruct the ``watch`` method to return the document's **pre-image**, the
175+
full version of the document *before* changes, in addition to the modified fields. To
176+
include the pre-image in the change stream event, pass an options hash to ``watch``
177+
that sets the ``full_document_before_change`` option. You can set this option to the
178+
following string values:
179+
180+
- ``'whenAvailable'``: The change event includes a pre-image
181+
of the modified document for change events. If the pre-image is not available, this
182+
change event field has a ``nil`` value.
183+
- ``'required'``: The change event includes a pre-image
184+
of the modified document for change events. If the pre-image is not available, the
185+
server raises an error.
186+
- ``'off'``: (Default) The change event does not include a pre-image of the modified document.
187+
188+
You can also instruct the ``watch`` method to return the document's **post-image**,
189+
the full version of the document *after* changes, in addition to the modified fields.
190+
To include the post-image in the change stream event, pass an options hash to ``watch``
191+
that sets the ``full_document`` option. You can set this option to the following string
192+
values:
193+
194+
- ``'updateLookup'``: The change event includes a
195+
copy of the entire changed document from some time after the change.
196+
- ``'whenAvailable'``: The change event includes a post-image of the modified
197+
document for change events. If the post-image is not available, this change
198+
event field has a ``nil`` value.
199+
- ``'required'``: The change event includes a post-image of the modified document
200+
for change events. If the post-image is not available, the server raises an error.
201+
- ``'default'``: (Default) The change event does not include a post-image of the modified document.
202+
203+
The following example calls the ``watch`` method on a collection and includes the post-image
204+
of updated documents by setting the ``full_document`` option:
205+
206+
.. literalinclude:: /includes/read/change-streams.rb
207+
:start-after: start-change-stream-post-image
208+
:end-before: end-change-stream-post-image
209+
:language: ruby
210+
:dedent:
211+
212+
With the change stream application running in a separate shell, updating a
213+
document in the ``restaurants`` collection by using the :ref:`preceding update
214+
example <ruby-change-stream-update>` prints a change event resembling the following
215+
output:
216+
217+
.. code-block:: none
218+
:copyable: false
219+
:emphasize-lines: 2-6
220+
221+
{"_id"=>{"_data"=>"..."}, "operationType"=>"update", "clusterTime"=>
222+
#<...1>, "wallTime"=>..., "fullDocument"=>{"_id"=>BSON::ObjectId('...'),
223+
"address"=>{"building"=>"202-24", "coord"=>[-73.9250442, 40.5595462],
224+
"street"=>"Rockaway Point Boulevard", "zipcode"=>"11697"},
225+
"borough"=>"Queens", "cuisine"=>"Irish", "grades"=>[...],
226+
"name"=>"Blarney Castle", "restaurant_id"=>"40366356"}, "ns"=>
227+
{"db"=>"sample_restaurants", "coll"=>"restaurants"}, "documentKey"=>
228+
{"_id"=>BSON::ObjectId('...')}, "updateDescription"=>{"updatedFields"=>
229+
{"cuisine"=>"Irish"}, "removedFields"=>[], "truncatedArrays"=>[]}}
230+
231+
.. tip::
232+
233+
To learn more about pre-images and post-images, see
234+
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>`
235+
in the {+mdb-server+} manual.
236+
237+
Additional Information
238+
----------------------
239+
240+
To learn more about change streams, see :manual:`Change Streams
241+
</changeStreams>` in the {+mdb-server+} manual.
242+
243+
API Documentation
244+
~~~~~~~~~~~~~~~~~
245+
246+
To learn more about any of the methods or types discussed in this
247+
guide, see the following API documentation:
248+
249+
- `Mongo::Collection::View::ChangeStream <{+api-root+}/Mongo/Collection/View/ChangeStream.html>`__
250+
- `Mongo::Client::watch <{+api-root+}/Mongo/Client.html#watch-instance_method>`__
251+
- `Mongo::Database::watch <{+api-root+}/Mongo/Database.html#watch-instance_method>`__
252+
- `Mongo::Collection::watch <{+api-root+}/Mongo/Collection.html#watch-instance_method>`__

0 commit comments

Comments
 (0)