-
Notifications
You must be signed in to change notification settings - Fork 29
DOCSP-45200: Change streams #123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,56 @@ | ||||||||||||||||||||
require 'bundler/inline' | ||||||||||||||||||||
|
||||||||||||||||||||
gemfile do | ||||||||||||||||||||
source 'https://rubygems.org' | ||||||||||||||||||||
gem 'mongo' | ||||||||||||||||||||
end | ||||||||||||||||||||
|
||||||||||||||||||||
uri = '<connection string>' | ||||||||||||||||||||
|
||||||||||||||||||||
Mongo::Client.new(uri) do |client| | ||||||||||||||||||||
# Accesses the database and collection | ||||||||||||||||||||
# start-db-coll | ||||||||||||||||||||
database = client.use('sample_restaurants') | ||||||||||||||||||||
collection = database[:restaurants] | ||||||||||||||||||||
# end-db-coll | ||||||||||||||||||||
|
||||||||||||||||||||
# Monitors and prints changes to the "restaurants" collection | ||||||||||||||||||||
# start-open-change-stream | ||||||||||||||||||||
stream = collection.watch | ||||||||||||||||||||
enum = stream.to_enum | ||||||||||||||||||||
while (doc = enum.next) | ||||||||||||||||||||
puts doc | ||||||||||||||||||||
break if doc['operationType'] == 'invalidate' | ||||||||||||||||||||
end | ||||||||||||||||||||
# end-open-change-stream | ||||||||||||||||||||
|
||||||||||||||||||||
# Updates a document that has a 'name' value of 'Blarney Castle' | ||||||||||||||||||||
# start-update-for-change-stream | ||||||||||||||||||||
collection.update_one( | ||||||||||||||||||||
{ 'name' => 'Blarney Castle' }, | ||||||||||||||||||||
{ '$set' => { 'cuisine' => 'Irish' } } | ||||||||||||||||||||
) | ||||||||||||||||||||
# end-update-for-change-stream | ||||||||||||||||||||
|
||||||||||||||||||||
# Passes a pipeline argument to watch to monitor only update operations | ||||||||||||||||||||
# start-change-stream-pipeline | ||||||||||||||||||||
pipeline = [{ '$match' => { 'operationType' => 'update' } }] | ||||||||||||||||||||
stream = collection.watch(pipeline) | ||||||||||||||||||||
enum = stream.to_enum | ||||||||||||||||||||
while (doc = enum.next) | ||||||||||||||||||||
puts doc | ||||||||||||||||||||
break if doc['operationType'] == 'invalidate' | ||||||||||||||||||||
end | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explicitly converting the stream to an enum is usually a hint that you're doing something the hard way. Prefer
Suggested change
|
||||||||||||||||||||
# end-change-stream-pipeline | ||||||||||||||||||||
|
||||||||||||||||||||
# Passes an options argument to watch to include the post-image of updated documents | ||||||||||||||||||||
# start-change-stream-post-image | ||||||||||||||||||||
options = { full_document: 'updateLookup' } | ||||||||||||||||||||
stream = collection.watch([], options) | ||||||||||||||||||||
enum = stream.to_enum | ||||||||||||||||||||
while (doc = enum.next) | ||||||||||||||||||||
puts doc | ||||||||||||||||||||
break if doc['operationType'] == 'invalidate' | ||||||||||||||||||||
end | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||
# end-change-stream-post-image | ||||||||||||||||||||
end |
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,253 @@ | ||||||||||||||||
.. _ruby-change-streams: | ||||||||||||||||
|
||||||||||||||||
==================== | ||||||||||||||||
Monitor Data Changes | ||||||||||||||||
==================== | ||||||||||||||||
|
||||||||||||||||
.. contents:: On this page | ||||||||||||||||
:local: | ||||||||||||||||
:backlinks: none | ||||||||||||||||
:depth: 2 | ||||||||||||||||
:class: singlecol | ||||||||||||||||
|
||||||||||||||||
.. facet:: | ||||||||||||||||
:name: genre | ||||||||||||||||
:values: reference | ||||||||||||||||
|
||||||||||||||||
.. meta:: | ||||||||||||||||
:keywords: watch, code example | ||||||||||||||||
|
||||||||||||||||
Overview | ||||||||||||||||
-------- | ||||||||||||||||
|
||||||||||||||||
In this guide, you can learn how to use a **change stream** to monitor real-time | ||||||||||||||||
changes to your data. A change stream is a {+mdb-server+} feature that | ||||||||||||||||
allows your application to subscribe to data changes on a collection, database, | ||||||||||||||||
or deployment. | ||||||||||||||||
|
||||||||||||||||
When using the {+driver-short+}, you can call the ``watch`` method to return a | ||||||||||||||||
``Mongo::Collection::View::ChangeStream`` object. Then, you can convert the change | ||||||||||||||||
stream to an ``Enumerator`` object and iterate through its content to monitor data | ||||||||||||||||
changes, such as updates, insertions, and deletions. | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no need to convert the change stream to an enumerator, as the stream is already iterable. Maybe something like the following?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this wording, updated! Sorry about that confusion |
||||||||||||||||
|
||||||||||||||||
Sample Data | ||||||||||||||||
~~~~~~~~~~~ | ||||||||||||||||
|
||||||||||||||||
The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants`` | ||||||||||||||||
database from the :atlas:`Atlas sample datasets </sample-data>`. To access this collection | ||||||||||||||||
from your {+language+} application, create a ``Mongo::Client`` object that connects to | ||||||||||||||||
an Atlas cluster and assign the following values to your ``database`` and ``collection`` | ||||||||||||||||
variables: | ||||||||||||||||
|
||||||||||||||||
.. literalinclude:: /includes/read/change-streams.rb | ||||||||||||||||
:language: ruby | ||||||||||||||||
:dedent: | ||||||||||||||||
:start-after: start-db-coll | ||||||||||||||||
:end-before: end-db-coll | ||||||||||||||||
|
||||||||||||||||
To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the | ||||||||||||||||
:atlas:`Get Started with Atlas </getting-started>` guide. | ||||||||||||||||
|
||||||||||||||||
Open a Change Stream | ||||||||||||||||
-------------------- | ||||||||||||||||
|
||||||||||||||||
To open a change stream, call the ``watch`` method. The object on which you | ||||||||||||||||
call the ``watch`` method determines the scope of events that the change | ||||||||||||||||
stream monitors. You can call the ``watch`` method on the following objects | ||||||||||||||||
|
||||||||||||||||
- ``Mongo::Client``: Monitors changes to all collections across all databases | ||||||||||||||||
in a deployment, excluding :manual:`system collections </reference/system-collections/>` | ||||||||||||||||
or collections in the ``admin``, ``local``, and ``config`` databases | ||||||||||||||||
- ``Mongo::Database``: Monitors changes to all collections in one database | ||||||||||||||||
- ``Mongo::Collection``: Monitors changes to one collection | ||||||||||||||||
|
||||||||||||||||
The following example opens a change stream on the ``restaurants`` collection, | ||||||||||||||||
converts the change stream to an ``Enumerator``, and outputs changes as they occur: | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned, there's no need to convert the stream to an enumerator.
Suggested change
|
||||||||||||||||
|
||||||||||||||||
.. literalinclude:: /includes/read/change-streams.rb | ||||||||||||||||
:start-after: start-open-change-stream | ||||||||||||||||
:end-before: end-open-change-stream | ||||||||||||||||
:language: ruby | ||||||||||||||||
:dedent: | ||||||||||||||||
|
||||||||||||||||
To begin watching for changes, run the preceding code. Then, in a separate | ||||||||||||||||
shell, modify the ``restaurants`` collection. The following example updates | ||||||||||||||||
a document that has a ``name`` field value of ``'Blarney Castle'``: | ||||||||||||||||
|
||||||||||||||||
.. _ruby-change-stream-update: | ||||||||||||||||
|
||||||||||||||||
.. literalinclude:: /includes/read/change-streams.rb | ||||||||||||||||
:start-after: start-update-for-change-stream | ||||||||||||||||
:end-before: end-update-for-change-stream | ||||||||||||||||
:language: ruby | ||||||||||||||||
:dedent: | ||||||||||||||||
|
||||||||||||||||
When you update the collection, the change stream application prints the change | ||||||||||||||||
as it occurs. The printed change event resembles the following output: | ||||||||||||||||
|
||||||||||||||||
.. code-block:: none | ||||||||||||||||
:copyable: false | ||||||||||||||||
|
||||||||||||||||
{"_id"=>{"_data"=>"..."}, "operationType"=>"update", "clusterTime"=>#<...>, | ||||||||||||||||
"ns"=>{"db"=>"sample_restaurants", "coll"=>"restaurants"}, "documentKey"=> | ||||||||||||||||
{"_id"=>BSON::ObjectId('...')}, "updateDescription"=>{"updatedFields"=> | ||||||||||||||||
{"cuisine"=>"Irish"}, "removedFields"=>[], "truncatedArrays"=>[]}} | ||||||||||||||||
|
||||||||||||||||
Modify the Change Stream Output | ||||||||||||||||
------------------------------- | ||||||||||||||||
|
||||||||||||||||
To modify the change stream output, you can pass pipeline stages in an array as a | ||||||||||||||||
parameter to the ``watch`` method. You can include the following stages in the | ||||||||||||||||
array: | ||||||||||||||||
|
||||||||||||||||
- ``$addFields`` or ``$set``: Adds new fields to documents | ||||||||||||||||
- ``$match``: Filters the documents | ||||||||||||||||
- ``$project``: Projects a subset of the document fields | ||||||||||||||||
- ``$replaceWith`` or ``$replaceRoot``: Replaces the input document with the | ||||||||||||||||
specified document | ||||||||||||||||
- ``$redact``: Restricts the contents of the documents | ||||||||||||||||
- ``$unset``: Removes fields from documents | ||||||||||||||||
|
||||||||||||||||
The following example passes a pipeline that includes the ``$match`` stage to the | ||||||||||||||||
``watch`` method. This instructs the ``watch`` method to output events only | ||||||||||||||||
when update operations occur: | ||||||||||||||||
|
||||||||||||||||
.. literalinclude:: /includes/read/change-streams.rb | ||||||||||||||||
:start-after: start-change-stream-pipeline | ||||||||||||||||
:end-before: end-change-stream-pipeline | ||||||||||||||||
:language: ruby | ||||||||||||||||
:dedent: | ||||||||||||||||
|
||||||||||||||||
Modify watch Behavior | ||||||||||||||||
--------------------- | ||||||||||||||||
|
||||||||||||||||
To modify the behavior of the ``watch`` method, you can pass an options hash | ||||||||||||||||
as a parameter to ``watch``. The following table describes some of the options that | ||||||||||||||||
you can set: | ||||||||||||||||
|
||||||||||||||||
.. list-table:: | ||||||||||||||||
:widths: 30 70 | ||||||||||||||||
:header-rows: 1 | ||||||||||||||||
|
||||||||||||||||
* - Option | ||||||||||||||||
- Description | ||||||||||||||||
|
||||||||||||||||
* - ``full_document`` | ||||||||||||||||
- | Specifies whether to show the full document after the change, rather | ||||||||||||||||
than showing only the changes made to the document. To learn more about | ||||||||||||||||
this option, see the :ref:`ruby-change-stream-pre-post-image` section of this | ||||||||||||||||
guide. | ||||||||||||||||
|
||||||||||||||||
* - ``full_document_before_change`` | ||||||||||||||||
- | Specifies whether to show the full document as it was before the change, rather | ||||||||||||||||
than showing only the changes made to the document. To learn more about | ||||||||||||||||
this option, see :ref:`ruby-change-stream-pre-post-image`. | ||||||||||||||||
|
||||||||||||||||
* - ``resume_after`` | ||||||||||||||||
- | Specifies the logical starting point for the change stream. | ||||||||||||||||
| This option is mutually exclusive with ``start_at_operation_time``. | ||||||||||||||||
|
||||||||||||||||
* - ``start_at_operation_time`` | ||||||||||||||||
- | Instructs the change stream to only provide changes that occurred at or after | ||||||||||||||||
the specified timestamp. | ||||||||||||||||
| This option is mutually exclusive with ``resume_after``. | ||||||||||||||||
|
||||||||||||||||
* - ``collation`` | ||||||||||||||||
- | Sets the collation to use for the change stream cursor. | ||||||||||||||||
|
||||||||||||||||
For a full list of ``watch`` options, see `watch <{+api-root+}/Mongo/Collection.html#watch-instance_method>`__ | ||||||||||||||||
in the API documentation. | ||||||||||||||||
|
||||||||||||||||
.. _ruby-change-stream-pre-post-image: | ||||||||||||||||
|
||||||||||||||||
Include Pre-Images and Post-Images | ||||||||||||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||||||||||||||
|
||||||||||||||||
.. important:: | ||||||||||||||||
|
||||||||||||||||
You can enable pre-images and post-images on collections only if your | ||||||||||||||||
deployment uses MongoDB v6.0 or later. | ||||||||||||||||
|
||||||||||||||||
By default, when you perform an operation on a collection, the corresponding | ||||||||||||||||
change event includes only the modified fields and their values before and | ||||||||||||||||
after the operation. | ||||||||||||||||
|
||||||||||||||||
You can instruct the ``watch`` method to return the document's **pre-image**, the | ||||||||||||||||
full version of the document *before* changes, in addition to the modified fields. To | ||||||||||||||||
include the pre-image in the change stream event, pass an options hash to ``watch`` | ||||||||||||||||
that sets the ``full_document_before_change`` option. You can set this option to the | ||||||||||||||||
following string values: | ||||||||||||||||
|
||||||||||||||||
- ``'whenAvailable'``: The change event includes a pre-image | ||||||||||||||||
of the modified document for change events. If the pre-image is not available, this | ||||||||||||||||
change event field has a ``nil`` value. | ||||||||||||||||
- ``'required'``: The change event includes a pre-image | ||||||||||||||||
of the modified document for change events. If the pre-image is not available, the | ||||||||||||||||
server raises an error. | ||||||||||||||||
- ``'off'``: (Default) The change event does not include a pre-image of the modified document. | ||||||||||||||||
|
||||||||||||||||
You can also instruct the ``watch`` method to return the document's **post-image**, | ||||||||||||||||
the full version of the document *after* changes, in addition to the modified fields. | ||||||||||||||||
To include the post-image in the change stream event, pass an options hash to ``watch`` | ||||||||||||||||
that sets the ``full_document`` option. You can set this option to the following string | ||||||||||||||||
values: | ||||||||||||||||
|
||||||||||||||||
- ``'updateLookup'``: The change event includes a | ||||||||||||||||
copy of the entire changed document from some time after the change. | ||||||||||||||||
- ``'whenAvailable'``: The change event includes a post-image of the modified | ||||||||||||||||
document for change events. If the post-image is not available, this change | ||||||||||||||||
event field has a ``nil`` value. | ||||||||||||||||
- ``'required'``: The change event includes a post-image of the modified document | ||||||||||||||||
for change events. If the post-image is not available, the server raises an error. | ||||||||||||||||
- ``'default'``: (Default) The change event does not include a post-image of the modified document. | ||||||||||||||||
|
||||||||||||||||
The following example calls the ``watch`` method on a collection and includes the post-image | ||||||||||||||||
of updated documents by setting the ``full_document`` option: | ||||||||||||||||
|
||||||||||||||||
.. literalinclude:: /includes/read/change-streams.rb | ||||||||||||||||
:start-after: start-change-stream-post-image | ||||||||||||||||
:end-before: end-change-stream-post-image | ||||||||||||||||
:language: ruby | ||||||||||||||||
:dedent: | ||||||||||||||||
|
||||||||||||||||
With the change stream application running in a separate shell, updating a | ||||||||||||||||
document in the ``restaurants`` collection by using the :ref:`preceding update | ||||||||||||||||
example <ruby-change-stream-update>` prints a change event resembling the following | ||||||||||||||||
output: | ||||||||||||||||
|
||||||||||||||||
.. code-block:: none | ||||||||||||||||
:copyable: false | ||||||||||||||||
:emphasize-lines: 2-6 | ||||||||||||||||
|
||||||||||||||||
{"_id"=>{"_data"=>"..."}, "operationType"=>"update", "clusterTime"=> | ||||||||||||||||
#<...1>, "wallTime"=>..., "fullDocument"=>{"_id"=>BSON::ObjectId('...'), | ||||||||||||||||
"address"=>{"building"=>"202-24", "coord"=>[-73.9250442, 40.5595462], | ||||||||||||||||
"street"=>"Rockaway Point Boulevard", "zipcode"=>"11697"}, | ||||||||||||||||
"borough"=>"Queens", "cuisine"=>"Irish", "grades"=>[...], | ||||||||||||||||
"name"=>"Blarney Castle", "restaurant_id"=>"40366356"}, "ns"=> | ||||||||||||||||
{"db"=>"sample_restaurants", "coll"=>"restaurants"}, "documentKey"=> | ||||||||||||||||
{"_id"=>BSON::ObjectId('...')}, "updateDescription"=>{"updatedFields"=> | ||||||||||||||||
{"cuisine"=>"Irish"}, "removedFields"=>[], "truncatedArrays"=>[]}} | ||||||||||||||||
|
||||||||||||||||
.. tip:: | ||||||||||||||||
|
||||||||||||||||
To learn more about pre-images and post-images, see | ||||||||||||||||
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>` | ||||||||||||||||
in the {+mdb-server+} manual. | ||||||||||||||||
|
||||||||||||||||
Additional Information | ||||||||||||||||
---------------------- | ||||||||||||||||
|
||||||||||||||||
To learn more about change streams, see :manual:`Change Streams | ||||||||||||||||
</changeStreams>` in the {+mdb-server+} manual. | ||||||||||||||||
|
||||||||||||||||
API Documentation | ||||||||||||||||
~~~~~~~~~~~~~~~~~ | ||||||||||||||||
|
||||||||||||||||
To learn more about any of the methods or types discussed in this | ||||||||||||||||
guide, see the following API documentation: | ||||||||||||||||
|
||||||||||||||||
- `Mongo::Collection::View::ChangeStream <{+api-root+}/Mongo/Collection/View/ChangeStream.html>`__ | ||||||||||||||||
- `Mongo::Client::watch <{+api-root+}/Mongo/Client.html#watch-instance_method>`__ | ||||||||||||||||
- `Mongo::Database::watch <{+api-root+}/Mongo/Database.html#watch-instance_method>`__ | ||||||||||||||||
- `Mongo::Collection::watch <{+api-root+}/Mongo/Collection.html#watch-instance_method>`__ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using
stream#each
will be simpler, and more idiomatic: