mongo/docs/change_streams.md

697 lines
38 KiB
Markdown

# Change Streams Overview
Disclaimer: the following documentation is based on how change streams are implemented in the
current version of master, if not explicitly stated otherwise. Implementation details for previous
versions may vary slightly.
Change streams are a convenient way for an application to monitor changes made to the data in a
deployment.
The events produced by change streams are called "change events". The event data is produced from
the oplog(s) of the deployment.
The events that are emitted by change streams include
- DML events: emitted for operations that insert, update, replace, or delete individual documents.
- DDL events: emitted for operations that create, drop, or modify collections, databases, or views.
- Data placement events: emitted for operations that define or modify the placement of data inside
a sharded cluster.
- Cluster topology events: emitted for operations that add or remove shards in a sharded cluster.
Which exact event types are emitted by a change stream depends on the change stream configuration
and the deployment type.
Change streams are mainly used by customer applications and tools to keep track of changes to the
data in a deployment, in order to relay these updates to external systems.
Some of MongoDB's own tools and components are also based on change streams, e.g. _mongosync_ (C2C),
Atlas Search, Atlas Stream Processing, and the resharding process.
The component that opens a change stream and pulls events from it is called the "consumer".
## Change Stream Guarantees
Change Streams provide various guarantees:
- Ordering: change streams deliver events in the order they originally occurred within the target
namespace (e.g., collection, database, or entire cluster). The order is based on the sequence in
which the operations were applied to the oplog.
In a sharded cluster, the events from multiple oplogs will be merged deterministically into a
single, ordered stream of change events.
- Durability: change streams are based on the internal oplog, which is part of the deployment's
replication mechanism. Change streams only deliver events after they have been committed to a
majority of nodes and durably persisted, ensuring they will not be rolled back.
- Resumability: change stream consumption can be interrupted due to transient errors (e.g. network
issues, node failures, application errors), but it can be resumed from the exact point where
the consumption stopped. This is made possible by the resume token (`_id` field) that accompanies
every change event, which acts as a bookmark. This allows to the consumer to continue processing
changes from the last known position without missing events.
### Change Stream Scopes
Change streams can be opened on different levels that control which events are emitted:
- collection-level change streams: these change streams only include events related to a specific
collection namespace (e.g. `testDB.testCollection`).
- database-level change streams: such change streams include events related to all collections in a
specific database (e.g. `testDB`).
- all-cluster change streams: these change streams include events for all databases/collections in
replica set or sharded cluster deployment.
Change streams cannot be opened on views.
Change streams respect the read permissions of the consumer. A consumer can only open change streams
on collections or databases that they have access to.
## Opening a Change Stream
Change streams can be opened against a replica sets and sharded cluster deployments. They cannot be
opened against standalone _mongod_ instances, as there is no oplog to generate the events from in
standalone mode.
In replica set deployments, the change stream can be opened directly on any replica set member of
the deployment.
In sharded cluster deployments, the change stream must be opened against any of the deployment's
_mongos_ processes.
A change stream is opened by executing an `aggregate` command with a pipeline that contains at least
the `$changeStream` pipeline stage.
_mongosh_, the "jstest shell" (_mongo_) and many drivers provide simpler "watch" wrappers for this.
### Opening a Collection-Level Change Stream
To open a collection-level change stream on a specific collection (e.g., `testDB.testCollection`),
the following _mongosh_ command can be used:
```js
db.getSiblingDB("testDB").runCommand({
aggregate: "testCollection",
pipeline: [
{
$changeStream: {},
},
],
cursor: {},
});
```
### Opening a Database-Level Change Stream
To open a database-level change stream on a specific database (e.g., `testDB`), use the following
command in _mongosh_:
```js
db.getSiblingDB("testDB").runCommand({
aggregate: 1,
pipeline: [
{
$changeStream: {},
},
],
cursor: {},
});
```
The `aggregate` parameter must be set to `1` for database-level change streams, and the command must
be executed inside the desired database.
The internal namespace that is used by database-level change streams is `<dbName>.$cmd.aggregate`
(where `<dbName>` is the actual name of the database).
### Opening an All-Cluster Change Stream
All-cluster change streams can only be opened on the `admin` database and also need the
`allChangesForCluster` flag to be set to `true` in order to work. The following _mongosh_ command
can be used to open an all-cluster change stream:
```js
db.adminCommand({
aggregate: 1,
pipeline: [
{
$changeStream: {
allChangesForCluster: true,
},
},
],
cursor: {},
});
```
The internal namespace that is used by all-cluster change streams is always `admin.$cmd.aggregate`.
### Change Stream Start Time
When opening a change stream without specifying an explicit point in time, the change stream will be
opened using the current time, and will report only change events that happened after that point
in time.
The current time here is
- the time of the latest majority-committed operation for replica set change streams, or
- the value of the cluster's vector clock for sharded cluster change streams.
To open a change stream at a specific point in time instead of using the current time, the parameter
`startAtOperationTime` can be set in the initial change stream request. The `startAtOperationTime`
parameter is specified as a logical timestamp.
### Resume Tokens
Change streams allow the consumer to resume the change stream after an error occurred.
To support resumability, change streams report a resume token in the `_id` field of every emitted
event.
To resume a change stream after an error occurred, the resume token of a previously consumed event
can be passed in one of the parameters `resumeAfter` or `startAfter` when opening a new change
stream.
The `resumeAfter` parameter cannot be used with resume tokens that were emitted by an "invalidate"
event. The `startAfter` parameter can be used even with invalidate events.
When specifying an explicit start point for a change stream, only one of the parameters
`resumeAfter`, `startAfter` and `startAtOperationTime` can be used. Using more than one of them when
opening a change stream will return an error.
Resume tokens are not "portable" in the sense that they can only be used to resume a change stream
that is opened with the same settings and pipeline stages as the change stream that produced the
original resume token.
For example, changing the `$match` expression of a change stream when resuming from a change stream
with a different `$match` expression may lead to different events being returned, which may lead to
the event with the original resume token not being found in the new change stream.
The resume tokens that are emitted by change streams are string values that contain a hexadecimal
encoding of the internal resume token data.
The internal resume token data contains
- the cluster time of an event.
- the version of the resume token format.
- the type of the token (event token or high watermark token).
- the internal position inside the transaction, if the event was part of a transaction.
- a flag stating if the resume token is for an "invalidate" event.
- the collection UUID.
- an event identifier / event description.
Resume tokens are serialized and deserialized by the [ResumeToken](https://github.com/mongodb/mongo/blob/6d182bc73acdf2270320eba611538f6619b627bc/src/mongo/db/pipeline/resume_token.h#L147)
class. The resume token internal data is stored in [ResumeTokenData](https://github.com/mongodb/mongo/blob/6d182bc73acdf2270320eba611538f6619b627bc/src/mongo/db/pipeline/resume_token.h#L50).
Resume tokens are versioned. Currently only version 2 is supported.
Future versions may introduce new resume token versions. Client applications should treat resume
tokens as opaque identifiers and should not make any assumptions about the format or internals
or resume tokens, nor should they rely on the internal implementation details of resume tokens.
### Change Stream Cursors
When opening a change stream on a replica set, a cursor will be established on the targeted replica
set node. The change stream cursor is "tailable" and will remain open until it is explicitly closed
by the consumer or the change stream runs into an error. Also, unused cursors are eventually
garbage-collected after a period of inactivity.
When opening a change stream on a sharded cluster, the targeted `mongos` instance will open the
required cursors on the relevant shards of the cluster and also the config server. Here, the `mongos`
instance will also automatically open additional cursors in case new shards are added to the
cluster. All this is abstracted from the consumer of the change stream. The consumer of the change
stream will only see a single cursor and interact with _mongos_, which handles the complexity of
managing the underlying shard cursors.
If a change stream cursor can be successfully established, the cursor id is returned to the
consumer. The consumer can then use the cursor id to pull change events from the change stream by
issuing follow-up `getMore` commands to this cursor.
If a change stream cursor cannot be successfully opened, the initial `aggregate` command will
return an error, and the returned cursor id will be `0`. In this case, no events can be consumed
from the change stream, and the consumer needs to resolve the error.
## Consuming Events from a Change Stream
To fetch events from a previously opened change stream, the consumer can send a `getMore` request
using the cursor id that was established by the initial `aggregate` command, e.g.
```js
// For a collection-level change stream on "testDB.testCollection"
db.getSiblingDB("testDB").runCommand({
getMore: cursorId,
collection: "testCollection",
});
// For a database-level change stream on "testDB"
db.getSiblingDB("testDB").runCommand({
getMore: cursorId,
collection: "$cmd.aggregate",
});
// For an all-cluster change stream:
db.adminCommand({
getMore: cursorId,
collection: "$cmd.aggregate",
});
```
Responses can be further controlled by using the following optional parameters in the `getMore`
request:
- `batchSize`: maximum number of change events to return in the response.
- `maxTimeMS`: maximum server-side waiting time for producing events.
The `getMore` command will fill the response with up to `batchSize` results if that many events are
available. A response can also contain less events than the specified `batchSize`.
Regardless of the specified batch size, the maximum response size limit of 16MB will be honored, in
order to prevent responses from getting too large.
A change stream response is returned to the consumer when
- `batchSize` events have been accumulated in the response, or
- at least one event has been accumulated in the response, but adding the next event to the response
would make it exceed the 16MB size limit.
In case the change stream cursor has reached the end of the oplog and there are currently no events
to return, the response will be returned immediately if it already contains at least one event.
If the response is empty, the change stream will wait for at most `maxTimeMS` for new oplog entries
to arrive.
If no new oplog entries arrive within `maxTimeMS`, an empty response will be returned. If new oplog
entries arrive within `maxTimeMS` and at least one of them matches the change stream's filter, the
matching event will be returned immediately. If oplog entries arrive but do not match the change
stream's filter, the change stream will wait for matching oplog entries until `maxTimeMS` is fully
expired.
### Generic Event layout
In general, the returned change stream events have the following fields:
- `_id`: resume token for the event. This is **not** the same as the document id. The resume token
can be used to open a new change stream starting at the very same event.
- `operationType`: type of the change stream event.
- `clusterTime`: logical timestamp of when the event originally occurred.
- `wallTime`: wall-clock date/time of when the event originally occurred.
- `ns`: namespace inside which the event occurred.
The following generic fields are added for change streams that were opened with the
`showExpandedEvents` flag:
- `collectionUUID`: UUID of the collection for which the event occurred, if applicable.
- `operationDescription`: populated for DDL events.
Most other fields are event type-specific, so they are only present for specific events.
A few such fields include:
- `documentKey`: the `_id` value of the affected document, populated for DML events. May contain the
shard key values for sharded collections.
- `fullDocument`: the full document for "insert" and "replace" events. Will also be populated for
"update" events if the change stream is opened with the `fullDocument` parameter set to any other
value than `default`.
- `updateDescription` / `rawUpdateDescription`: contains details for "update" events.
The majority of change stream event fields are emitted by the `ChangeStreamDefaultEventTransformation`
object [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/change_stream_event_transform.cpp#L289). This object is called by the `ChangeStreamEventTransform`
stage [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_transform_stage.cpp#L75).
A custom `$project` stage in the change stream pipeline can be used to suppress certain fields.
### Large Events
Emitted change events can get large, especially if they contain pre- or post-images. In this case
the events can exceed the maximum BSON object size of 16MB, which can lead to `BSONObjectTooLarge`
errors when trying to process these change stream events.
To split large change stream events into multiple smaller chunks, change stream consumers can add
a `$changeStreamSplitLargeEvent` stage as the last step of their change stream pipeline, e.g.
```js
db.getSiblingDB("testDB").runCommand({
aggregate: "testCollection",
pipeline: [
{
$changeStream: {},
},
{
$changeStreamSplitLargeEvent: {},
},
],
cursor: {},
});
```
The splitting is performed by the `ChangeStreamSplitLargeEventStage` stage [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_split_large_event_stage.cpp#L72),
using [this helper function](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/change_stream_split_event_helpers.cpp#L63).
The change stream consumer is responsible for assembling the split event fragments into a single
event later.
#### Change Stream Invalidate Events
Collection-level and database-level change streams can return so-called "invalidate" events that
close the change stream cursor in specific situations:
- in collection-level change streams, the change stream is invalidated in the following situations:
- the target collection is dropped
- the target collection is renamed
- the parent database of the target collection is dropped
- in database-level change streams, the change stream is invalidated if the target database is
dropped.
In case a change stream gets invalidated by any of the above situations, it will emit a special
"invalidate" event to inform the consumer that further processing is not possible.
There are no "invalidate" events in all-cluster change streams.
Issuing of change stream invalidate events is implemented in the `ChangeStreamCheckInvalidateStage`
[here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_check_invalidate_stage.cpp#L134).
## Resuming Change Streams
## Change Stream Parameters
The behavior of change streams can be controlled via various parameters that can be passed with the
initial `aggregate` command used to open the change stream.
The parameters are defined in an [IDL file](https://github.com/mongodb/mongo/blob/12ca4325fb5c5eca38d77b75bb570cc043398144/src/mongo/db/pipeline/document_source_change_stream.idl#L83).
The parameters that are provided when opening the change stream are automatically validated using
mechanisms provided by the IDL framework. Additional validation of the change stream parameters is
performed [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream.cpp#L388). Invalid change stream parameters are immediately rejected
with appropriate errors.
### `fullDocument`
The `fullDocument` change stream parameter controls what value should be returned inside the
`fullDocument` field for change stream DML "update" events.
The following values are possible:
- `default`: the `fullDocument` field will only be populated for "insert" and "replace" events.
- `updateLookup`: the `fullDocument` field will be populated with the _current_ version of the
document, identified by the document's `_id` value. Note that the current version of the document
may not be the same version of the document that was present when the "update" change event was
originally recorded. If no document can be found by the lookup, the `fullDocument` field will
contain `null`.
- `whenAvailable`: the `fullDocument` field will be populated with the post-image for the event.
The post-image is generated on the fly from a stored pre-image and applying a delta update from
the event on top of it. If no post-image is available, the `fullDocument` field will contain
`null`.
- `required`: populates the `fullDocument` field with the post-image for the event. Post-images are
generated in the same way as in `whenAvailable`. If no post-image can be generated, this will
abort the change stream with a `NoMatchingDocument` error.
The latter two options rely on pre-images to be enabled for the target collection(s).
Post-images are added by the `ChangeStreamAddPostImage` stage [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_add_post_image_stage.cpp#L84).
### `fullDocumentBeforeChange`
The `fullDocumentBeforeChange` change stream parameter controls what value should be returned inside
the `fullDocumentBeforeChange` field for change stream DML events ("update", "replace", "delete").
The following values are possible:
- `off` (default): the `fullDocumentBeforeChange` field will always be omitted.
- `whenAvailable`: the field will be populated with the pre-image of the document modified by the
current change event, if available. If no pre-image is available, the `fullDocumentBeforeChange`
field will contain `null`.
- `required`: populates the `fullDocumentBeforeChange` field with the stored pre-image for the event
if it exists. If no pre-image is available, aborts the change stream with a `NoMatchingDocument`
error.
Pre-images are added by the `ChangeStreamAddPreImage` stage [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_add_pre_image_stage.cpp#L68).
### Change Stream Flags
There are also numerous flags that control the behavior of change streams. The most important flag
parameters are:
#### `showExpandedEvents` (public)
The `showExpandedEvents` flag can be used to make a change stream return both additional event types
and additional fields.
The flag defaults to `false`. In this mode, change streams will only return DML events and no DDL
events.
When setting `showExpandedEvents` to `true`, change streams will also emit events for various DDL
operations.
In addition, setting `showExpandedEvents` will make change streams return the additional fields
`collectionUUID` (for various change stream event types) and `updateDescription.disambiguatedPaths`
(for update events).
#### `matchCollectionUUIDForUpdateLookup` (public)
The `matchCollectionUUIDForUpdateLookup` field can be used to ensure that "updateLookup" operations
are performed on the correct collection in case multiple collections with the same name have existed
over time.
This is relevant, because change streams can be opened retroactively on collections that were already
dropped and may have been recreated with the same name but different contents afterwards.
The flag defaults to `false`. In this case, "updateLookup" operations will not verify that the
looked-up document is actually from the same collection "generation" as the change event the
document was looked up for.
If set to `true`, "updateLookup" operations will compare the collection UUID of the change event
with the UUID of the collection. If there is a UUID mismatch, the returned `fullDocument` field of
the event will be set to `null`.
#### `allChangesForCluster` (public)
This flag must be set when opening an all-cluster change stream. Will normally be set by drivers
automatically when opening an all-cluster change stream.
#### `showSystemEvents` (internal)
The `showSystemEvents` flag can be used to make change streams return events for collections inside
the `system` namespace. These are not emitted by default. Setting `showSystemEvents` to `true` will
also include events related to system collections in the change stream.
The flag defaults to `false` and is internal.
#### `showMigrationEvents` (internal)
The `showMigrationEvents` flag can be used to make change streams return DML events that are
happening during chunk migrations. If set to `true`, insert and delete events related to chunk
migrations will be reported as if they were regular events.
The flag defaults to `false` and is internal.
#### `showCommitTimestamp` (internal)
The `showCommitTimestamp` flag can be used to include the transaction commit timestamp inside DML
events that were part of a prepared transaction.
The flag defaults to `true` and is internal. It is used by the resharding.
#### `showRawUpdateDescription` (internal)
The `showRawUpdateDescription` flag can be used to make change streams emit the raw, internal format
used for "update" oplog entries.
If set to `true`, emitted change stream "update" events will contain a `rawUpdateDescription` field.
The default is `false`. In this case, emitted change stream "update" events will contain the regular
`updateDescription` field.
#### `allowToRunOnConfigDB` (internal)
The `allowToRunOnConfigDB` flag is an internal flag that can be used to open a change stream on the
config server in a sharded cluster. It is used internally by `mongos` to open a cursor on the config
server to keep track of shard additions and removals in the deployment.
## Differences Between Replica Set and Sharded Cluster Change Streams
When a change stream is opened against a replica set, the consumer opens the change stream directly
on a replica set node, which can then return matching events immediately from the node's own oplog.
The events are already correctly ordered, and the latency is defined by the node's replication lag
and how close the change stream has advanced towards the end of the node's oplog.
Opening a change stream on a sharded cluster works differently. Here, the consumer opens the change
stream against a _mongos_ instance. The _mongos_ instance will then use the cluster's topology
information to open the cursors on the config server and the data shards on behalf of the consumer.
Because of the ordering guarantee provided by change streams, _mongos_ must wait until all cursors
have either responded with events, or ran into a timeout and reported that currently no more events
are available for them.
The latter is why change streams in a sharded cluster can have higher latency than change streams
in replica sets.
For sharded cluster change streams, the merging of the multiple streams of change events from the
different cursors is performed by the [`AsyncResultsMerger`](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/s/query/exec/async_results_merger.h#L98).
## Change Stream Pipeline Building
A change stream pipeline issued by a consumer contains the `$changeStream` meta stage.
This stage is expanded internally into multiple `DocumentSource`s [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/change_stream_pipeline_helpers.cpp#L172).
The change stream `DocumentSource`s are located in the `src/mongo/db/pipeline` directory [here](https://github.com/mongodb/mongo/tree/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline), among other `DocumentSource`s that
are not related to change streams.
The `DocumentSource`s are only used for pipeline building and optimization, but they are converted
into execution `Stage`s later when the change stream is executed.
These `Stage`s are located in the `src/mongo/db/exec/agg` directory [here](https://github.com/mongodb/mongo/tree/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg).
### Replica Set Pipelines
On a replica set, the `$changeStream` stage is expanded into the following internal stages:
- `$_internalChangeStreamOplogMatch`
- `$_internalChangeStreamUnwindTransaction`
- `$_internalChangeStreamTransform`
- `$_internalChangeStreamCheckInvalidate` (only used for collection-level and database-level change
streams)
- `$_internalChangeStreamCheckResumability`
- `$_internalChangeStreamAddPreImage` (only used if `fullDocumentBeforeChange` is not set to `off`)
- `$_internalChangeStreamAddPostImage` (only used if `fullDocument` is not set to `default`)
- `$_internalChangeStreamEnsureResumeTokenPresent` (only used if the change stream resume token is
not a high water mark token)
Additionally, the change stream pipeline on replica sets will contain a final `$match` stage to
filter out all non-DML change events in case `showExpandedEvents` is not set.
### Sharded Cluster Pipelines
For sharded cluster change streams, _mongos_ will first expand the `$changeStream` stage into the
following internal stages:
- `$_internalChangeStreamOplogMatch`
- `$_internalChangeStreamUnwindTransaction`
- `$_internalChangeStreamTransform`
- `$_internalChangeStreamCheckInvalidate` (only used for collection-level and database-level change
streams)
- `$_internalChangeStreamCheckResumability`
- `$_internalChangeStreamCheckTopologyChange`
- `$_internalChangeStreamAddPreImage` (only used if `fullDocumentBeforeChange` is not set to `off`)
- `$_internalChangeStreamAddPostImage` (only used if `fullDocument` is not set to `default`)
- `$_internalChangeStreamHandleTopologyChange`
- `$_internalChangeStreamEnsureResumeTokenPresent` (only used if the change stream resume token is
not a high water mark token)
Additionally, the change stream pipeline on a sharded cluster will contain a final `$match` stage to
filter out all non-DML change events in case `showExpandedEvents` is not set.
After building the initial pipeline stages, _mongos_ will split the pipeline into two parts:
- a part that is executed on data shards ("shard pipeline") and
- a part that is executed on _mongos_ ("merge pipeline").
The pipeline split point is above the `$_internalChangeStreamHandleTopologyChange` stage.
_mongos_ will also add a `$mergeCursors` stage that aggregates the responses from different shards
and the config server into a single, sorted stream.
#### Data Shard Pipeline
The shard pipeline will look like this:
- `$_internalChangeStreamOplogMatch`
- `$_internalChangeStreamUnwindTransaction`
- `$_internalChangeStreamTransform`
- `$_internalChangeStreamCheckInvalidate` (only used for collection-level and database-level change
streams)
- `$_internalChangeStreamCheckResumability`
- `$_internalChangeStreamCheckTopologyChange`
- `$_internalChangeStreamAddPreImage` (only used if `fullDocumentBeforeChange` is not set to `off`)
- `$_internalChangeStreamAddPostImage` (only used if `fullDocument` is not set to `default`)
#### Mongos Merge Pipeline
The merge pipeline on _mongos_ will look like this:
- `$mergeCursors`
- `$_internalChangeStreamHandleTopologyChange`
- `$_internalChangeStreamEnsureResumeTokenPresent` (only used if the change stream resume token is
not a high water mark token)
### Details of individual Pipeline Stages
#### `$_internalChangeStreamOplogMatch`
This stage is responsible for reading data from the oplog and filtering out irrelevant events.
The `DocumentSourceChangeStreamOplogMatch` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h#L54).
The oplog filter for the stage is built [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp#L76).
There is no `Stage` equivalent for `DocumentSourceChangeStreamOplogMatch`, as it will be turned into
a `$cursor` stage for execution.
#### `$_internalChangeStreamUnwindTransaction`
This stage is responsible for "unwinding" (expanding) multiple operations that are contained in an
"applyOps" oplog entry into individual events.
The `DocumentSourceChangeStreamUnwindTransaction` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h#L66).
The `ChangeStreamUnwindTransactionStage` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_unwind_transaction_stage.cpp#L89).
#### `$_internalChangeStreamTransform`
This stage is responsible for converting oplog entries into change events. It will build a change
event document for every oplog entry that enters this stage.
Event fields are added based on the change stream configuration.
The `DocumentSourceChangeStreamTransform` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_transform.h#L55).
The `ChangeStreamTransformStage` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_transform_stage.cpp#L75).
The actual event transformation happens inside `ChangeStreamDefaultEventTransformation` [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/change_stream_event_transform.cpp#L289).
#### `$_internalChangeStreamCheckInvalidate`
This stage is responsible for creating change stream "invalidate" events and is only added for
collection-level and database-level change streams.
The `DocumentSourceChangeStreamCheckInvalidate` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h#L60).
The `ChangeStreamCheckInvalidate` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_check_invalidate_stage.cpp#L106).
When an invalidate event is encountered, the stage will first emit an "invalidate" event, and then
throws a `ChangeStreamInvalidated` exception on the next call. The [`ChangeStreamInvalidatedInfo`](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/change_stream_invalidation_info.h#L46)
exception type contains the error code `ChangeStreamInvalidated`.
#### `$_internalChangeStreamCheckResumability`
This stage checks if the oplog has enough history to resume the change stream, and consumes all
events up to the given resume point. If no data for the resume point can be found in the oplog
anymore, it will throw a `ChangeStreamHistoryLost` error.
The `DocumentSourceChangeStreamCheckResumability` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h#L73).
The `ChangeStreamCheckResumabilityStage` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_check_resumability_stage.cpp#L68).
#### `$_internalChangeStreamAddPreImage`
This stage is responsible for adding pre-image data to "update", "replace" and "delete" events. It
is only added to change stream pipelines if the `fullDocumentBeforeChange` parameter is not set to
`off`.
If enabled, the stage relies on the pre-images stored in the system's pre-image system collection.
The `DocumentSourceChangeStreamAddPreImage` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h#L62).
The `ChangeStreamAddPreImageStage` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_add_pre_image_stage.cpp#L68).
#### `$_internalChangeStreamAddPostImage`
This stage is responsible for adding post-image data to "update" events. It is only added to change
stream pipelines if the `fullDocument` parameter is not set to `default`.
If `fullDocument` is set to `updateLookup`, the stage will perform a lookup for the current version
of a document that was updated by an "update" event, and store it in the `fullDocument` field of
the "update" event if present. The lookup is performed using the `_id` value of the document from
the change event. As the lookup is executed at a different point in time than when the change event
was recorded, it is possible that the lookup finds a different version of the document than the one
that was active when the change event was recorded. This can happen if the document was updated
again between the change event and the lookup. The lookup may also find no document at all if the
document was deleted after the "update" event, but before the lookup.
In case the lookup cannot find a document with the requested `_id`, it will populate the
`fullDocument` field with a value of `null`.
If `fullDocument` is set to `whenAvailable` or `required`, the stage will make use of the stored
pre-image of the document in the system's pre-image system collection. It will fetch the pre-image
and then apply the delta that is stored in the "update" change event on top of it, and store the
result in the `fullDocument` field.
The `DocumentSourceChangeStreamAddPostImage` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h#L58).
The `ChangeStreamAddPostImageStage` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_add_post_image_stage.cpp#L84).
#### `$_internalChangeStreamEnsureResumeTokenPresent`
This stage is used by change streams to ensure that the resume token that was specified as part of
the change stream parameters is actually in the stream. The stage is only present if the change
stream resume token is not a high water mark token. If the resume token cannot be found in the
stream, it will throw a `ChangeStreamFatalError`.
The `DocumentSourceChangeStreamEnsureResumeTokenPresent` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h#L50).
The `ChangeStreamEnsureResumeTokenPresent` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_ensure_resume_token_present_stage.cpp#L67).
#### `$_internalChangeStreamCheckTopologyChange`
This stage is only present in sharded cluster change streams and is always part of a data shard
pipeline. The stage detects change stream topology changes in the form of `kNewShardDetectedOpType`
events and forwards these events directly to the outermost executor layer via an exception. Using an
exception bypasses the rest of the pipeline, ensuring that the event cannot be filtered out or
modified by user-specified stages and that it will ultimately be available to the _mongos_.
The `DocumentSourceChangeStreamCheckTopologyChange` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h#L61).
The `ChangeStreamCheckTopologyChange` code is [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_check_topology_change_stage.cpp#L63).
The exception thrown by the stage is caught by the `PlanExecutorPipeline` [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/plan_executor_pipeline.cpp#L149).
#### `$_internalChangeStreamHandleTopologyChange`
This stage is only present in sharded cluster change streams and is always part of the _mongos_
merge pipeline. The stage is responsible for opening additional cursors to shards that have been
added to the cluster. It will handle the `kNewShardDetectedOpType` events produced by the
`$_internalChangeStreamCheckTopologyChange` stage.
The `DocumentSourceChangeStreamHandleTopologyChange` code can be found [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h#L58).
The `ChangeStreamHandleTopologyChangeStage` code can be found [here](https://github.com/mongodb/mongo/blob/546ffe8f1c6a3996cff1e7b3cc65431d257289dd/src/mongo/db/exec/agg/change_stream_handle_topology_change_stage.cpp#L133).
## Missing documentation (to be completed)
- How are user-defined match expressions are handled, rewritten and pushed down.
- Changes to pipeline building and behavior due to SPM-1941.
- Mention `$_passthroughToShard` and how it works.