Change Streams
To use Change Streams, we switch to the collection (use <collection>
) then open a cursor on a collection with:
let cursor = db['collection-name'].watch()
- we can also open it on a whole database with
db.watch()
We poll for change events using:
cursor.hasNext()
- returns booleancursor.next()
- return the next change event (or error)cursor.tryNext()
- returns the next change event (or null)
Change Event:
_id
- change eventId (not the_id
of the document that was changeed)- this is used as a
ResumeToken
- this is used as a
operationType
- insert, delete, update, replace etc.fullDocument
- the latest version of an entire document of insert, delete, update, and replace events.- if there are multiple changes on the same document over a period of time but none of those change events got processed in the intervening period, the
fullDocument
will still show the latest data, as opposed to the data we'd have as a result of any specific operation.
- if there are multiple changes on the same document over a period of time but none of those change events got processed in the intervening period, the
updateDescription
- fields that were updated or removed and the values of those fields after the change.
Change Streams are filterable, which is accomplished by defining an aggregation pipeline.
Change Streams are resumable, which is accomplished by passing either ResumeTokens or timestamps (resumeAfter
) when configuring the cursor.
- A strategy is to store ResumeTokens in some data layer (e.g. another collection of the same Mongo database) so that the consumer of the stream knows from where to start polling.
- ex. if we are using Lambda to consume events, the Lambda will have to know from where to resume polling, since Lambdas are ephemeral.
Cook
All commands can be run from mongo
shell
let cursor = db.collection.myCollection.watch()
cursor.next()