SERVER-115275 Add dedicated `LiteParsed` classes for internal `ChangeStream` stages (#45103)

GitOrigin-RevId: fa855b44a9c51bfcdc82308671724770cd753426
This commit is contained in:
Joshua Siegel 2025-12-16 09:49:20 -05:00 committed by MongoDB Bot
parent 02472279a6
commit fd38ec6542
21 changed files with 112 additions and 19 deletions

View File

@ -128,7 +128,7 @@ public:
kStageName, repl::ReadConcernLevel::kMajorityReadConcern, level, isImplicitDefault); kStageName, repl::ReadConcernLevel::kMajorityReadConcern, level, isImplicitDefault);
} }
std::unique_ptr<StageParams> getStageParams() const final { std::unique_ptr<StageParams> getStageParams() const override {
return std::make_unique<ChangeStreamStageParams>(_originalBson); return std::make_unique<ChangeStreamStageParams>(_originalBson);
} }
@ -421,19 +421,15 @@ private:
* ensure that all the necessary authentication and input validation checks are applied while * ensure that all the necessary authentication and input validation checks are applied while
* parsing. * parsing.
*/ */
class LiteParsedDocumentSourceChangeStreamInternal final class DocumentSourceChangeStreamLiteParsedInternalBase
: public DocumentSourceChangeStream::LiteParsed { : public DocumentSourceChangeStream::LiteParsed {
public: protected:
static std::unique_ptr<LiteParsedDocumentSourceChangeStreamInternal> parse( DocumentSourceChangeStreamLiteParsedInternalBase(const BSONElement& spec, NamespaceString nss)
const NamespaceString& nss, const BSONElement& spec, const LiteParserOptions& options) {
return std::make_unique<LiteParsedDocumentSourceChangeStreamInternal>(spec, nss);
}
LiteParsedDocumentSourceChangeStreamInternal(const BSONElement& spec, NamespaceString nss)
: DocumentSourceChangeStream::LiteParsed(spec, std::move(nss)), : DocumentSourceChangeStream::LiteParsed(spec, std::move(nss)),
_privileges({Privilege(ResourcePattern::forClusterResource(_nss.tenantId()), _privileges({Privilege(ResourcePattern::forClusterResource(_nss.tenantId()),
ActionType::internal)}) {} ActionType::internal)}) {}
public:
PrivilegeVector requiredPrivileges(bool isMongos, bool bypassDocumentValidation) const final { PrivilegeVector requiredPrivileges(bool isMongos, bool bypassDocumentValidation) const final {
return _privileges; return _privileges;
} }
@ -442,6 +438,24 @@ private:
const PrivilegeVector _privileges; const PrivilegeVector _privileges;
}; };
template <typename StageParamsT>
class DocumentSourceChangeStreamLiteParsedInternal final
: public DocumentSourceChangeStreamLiteParsedInternalBase {
public:
DocumentSourceChangeStreamLiteParsedInternal(const BSONElement& originalBson,
NamespaceString nss)
: DocumentSourceChangeStreamLiteParsedInternalBase(originalBson, std::move(nss)) {}
static std::unique_ptr<DocumentSourceChangeStreamLiteParsedInternal> parse(
NamespaceString nss, const BSONElement& spec, const LiteParserOptions& options) {
return std::make_unique<DocumentSourceChangeStreamLiteParsedInternal>(spec, std::move(nss));
}
std::unique_ptr<StageParams> getStageParams() const final {
return std::make_unique<StageParamsT>(_originalBson);
}
};
/** /**
* A DocumentSource class for all internal change stream stages. This class is useful for * A DocumentSource class for all internal change stream stages. This class is useful for
* shared logic between all of the internal change stream stages. For internally created match * shared logic between all of the internal change stream stages. For internally created match

View File

@ -41,10 +41,14 @@ namespace mongo {
constexpr StringData DocumentSourceChangeStreamAddPostImage::kStageName; constexpr StringData DocumentSourceChangeStreamAddPostImage::kStageName;
constexpr StringData DocumentSourceChangeStreamAddPostImage::kFullDocumentFieldName; constexpr StringData DocumentSourceChangeStreamAddPostImage::kFullDocumentFieldName;
ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamAddPostImage,
ChangeStreamAddPostImageStageParams::id);
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamAddPostImage, REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamAddPostImage,
LiteParsedDocumentSourceChangeStreamInternal::parse, ChangeStreamAddPostImageLiteParsed::parse,
DocumentSourceChangeStreamAddPostImage::createFromBson, DocumentSourceChangeStreamAddPostImage::createFromBson,
true); true);
ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamAddPostImage, ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamAddPostImage,
DocumentSourceChangeStreamAddPostImage::id) DocumentSourceChangeStreamAddPostImage::id)

View File

@ -52,6 +52,10 @@
namespace mongo { namespace mongo {
DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamAddPostImage);
using ChangeStreamAddPostImageLiteParsed =
DocumentSourceChangeStreamLiteParsedInternal<ChangeStreamAddPostImageStageParams>;
/** /**
* Part of the change stream API machinery used to look up the post-image of a document. Uses the * Part of the change stream API machinery used to look up the post-image of a document. Uses the
* "documentKey" field of the input to look up the new version of the document. * "documentKey" field of the input to look up the new version of the document.

View File

@ -51,10 +51,13 @@
namespace mongo { namespace mongo {
ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamAddPreImage, ChangeStreamAddPreImageStageParams::id);
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamAddPreImage, REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamAddPreImage,
LiteParsedDocumentSourceChangeStreamInternal::parse, ChangeStreamAddPreImageLiteParsed::parse,
DocumentSourceChangeStreamAddPreImage::createFromBson, DocumentSourceChangeStreamAddPreImage::createFromBson,
true); true);
ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamAddPreImage, ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamAddPreImage,
DocumentSourceChangeStreamAddPreImage::id) DocumentSourceChangeStreamAddPreImage::id)

View File

@ -54,6 +54,10 @@
namespace mongo { namespace mongo {
DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamAddPreImage);
using ChangeStreamAddPreImageLiteParsed =
DocumentSourceChangeStreamLiteParsedInternal<ChangeStreamAddPreImageStageParams>;
/** /**
* Part of the change stream API machinery used to look up the pre-image of a document. * Part of the change stream API machinery used to look up the pre-image of a document.
* *

View File

@ -45,10 +45,14 @@
namespace mongo { namespace mongo {
ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamCheckInvalidate,
ChangeStreamCheckInvalidateStageParams::id);
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckInvalidate, REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckInvalidate,
LiteParsedDocumentSourceChangeStreamInternal::parse, ChangeStreamCheckInvalidateLiteParsed::parse,
DocumentSourceChangeStreamCheckInvalidate::createFromBson, DocumentSourceChangeStreamCheckInvalidate::createFromBson,
true); true);
ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamCheckInvalidate, ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamCheckInvalidate,
DocumentSourceChangeStreamCheckInvalidate::id) DocumentSourceChangeStreamCheckInvalidate::id)

View File

@ -53,6 +53,10 @@
namespace mongo { namespace mongo {
DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamCheckInvalidate);
using ChangeStreamCheckInvalidateLiteParsed =
DocumentSourceChangeStreamLiteParsedInternal<ChangeStreamCheckInvalidateStageParams>;
/** /**
* This stage is used internally for change stream notifications to artificially generate an * This stage is used internally for change stream notifications to artificially generate an
* "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop * "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop

View File

@ -49,10 +49,14 @@ using boost::intrusive_ptr;
namespace mongo { namespace mongo {
ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamCheckResumability,
ChangeStreamCheckResumabilityStageParams::id);
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckResumability, REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckResumability,
LiteParsedDocumentSourceChangeStreamInternal::parse, ChangeStreamCheckResumabilityLiteParsed::parse,
DocumentSourceChangeStreamCheckResumability::createFromBson, DocumentSourceChangeStreamCheckResumability::createFromBson,
true); true);
ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamCheckResumability, ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamCheckResumability,
DocumentSourceChangeStreamCheckResumability::id) DocumentSourceChangeStreamCheckResumability::id)

View File

@ -50,6 +50,11 @@
#include <boost/smart_ptr/intrusive_ptr.hpp> #include <boost/smart_ptr/intrusive_ptr.hpp>
namespace mongo { namespace mongo {
DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamCheckResumability);
using ChangeStreamCheckResumabilityLiteParsed =
DocumentSourceChangeStreamLiteParsedInternal<ChangeStreamCheckResumabilityStageParams>;
/** /**
* This stage checks whether or not the oplog has enough history to resume the stream, and consumes * This stage checks whether or not the oplog has enough history to resume the stream, and consumes
* all events up to the given resume point. It is deployed on all shards when resuming a stream on * all events up to the given resume point. It is deployed on all shards when resuming a stream on

View File

@ -40,10 +40,14 @@
namespace mongo { namespace mongo {
ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamCheckTopologyChange,
ChangeStreamCheckTopologyChangeStageParams::id);
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckTopologyChange, REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckTopologyChange,
LiteParsedDocumentSourceChangeStreamInternal::parse, ChangeStreamCheckTopologyChangeLiteParsed::parse,
DocumentSourceChangeStreamCheckTopologyChange::createFromBson, DocumentSourceChangeStreamCheckTopologyChange::createFromBson,
true); true);
ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamCheckTopologyChange, ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamCheckTopologyChange,
DocumentSourceChangeStreamCheckTopologyChange::id) DocumentSourceChangeStreamCheckTopologyChange::id)

View File

@ -48,6 +48,10 @@
namespace mongo { namespace mongo {
DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamCheckTopologyChange);
using ChangeStreamCheckTopologyChangeLiteParsed =
DocumentSourceChangeStreamLiteParsedInternal<ChangeStreamCheckTopologyChangeStageParams>;
/** /**
* This stage detects change stream topology changes in the form of 'kNewShardDetectedOpType' events * This stage detects change stream topology changes in the form of 'kNewShardDetectedOpType' events
* and forwards them directly to the executor via an exception. Using an exception bypasses the rest * and forwards them directly to the executor via an exception. Using an exception bypasses the rest

View File

@ -38,10 +38,14 @@
namespace mongo { namespace mongo {
ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamHandleTopologyChange,
ChangeStreamHandleTopologyChangeStageParams::id);
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamHandleTopologyChange, REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamHandleTopologyChange,
LiteParsedDocumentSourceChangeStreamInternal::parse, ChangeStreamHandleTopologyChangeLiteParsed::parse,
DocumentSourceChangeStreamHandleTopologyChange::createFromBson, DocumentSourceChangeStreamHandleTopologyChange::createFromBson,
true); true);
ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamHandleTopologyChange, ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamHandleTopologyChange,
DocumentSourceChangeStreamHandleTopologyChange::id) DocumentSourceChangeStreamHandleTopologyChange::id)

View File

@ -49,6 +49,10 @@
namespace mongo { namespace mongo {
DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamHandleTopologyChange);
using ChangeStreamHandleTopologyChangeLiteParsed =
DocumentSourceChangeStreamLiteParsedInternal<ChangeStreamHandleTopologyChangeStageParams>;
/** /**
* An internal stage used as part of the change streams infrastructure to listen for an event * An internal stage used as part of the change streams infrastructure to listen for an event
* signaling that a new shard now has potentially matching data. For example, this stage will * signaling that a new shard now has potentially matching data. For example, this stage will

View File

@ -48,8 +48,11 @@ using boost::intrusive_ptr;
namespace mongo { namespace mongo {
ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamInjectControlEvents,
ChangeStreamInjectControlEventsStageParams::id);
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamInjectControlEvents, REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamInjectControlEvents,
LiteParsedDocumentSourceChangeStreamInternal::parse, ChangeStreamInjectControlEventsLiteParsed::parse,
DocumentSourceChangeStreamInjectControlEvents::createFromBson, DocumentSourceChangeStreamInjectControlEvents::createFromBson,
true); true);

View File

@ -47,6 +47,11 @@
#include <boost/smart_ptr/intrusive_ptr.hpp> #include <boost/smart_ptr/intrusive_ptr.hpp>
namespace mongo { namespace mongo {
DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamInjectControlEvents);
using ChangeStreamInjectControlEventsLiteParsed =
DocumentSourceChangeStreamLiteParsedInternal<ChangeStreamInjectControlEventsStageParams>;
/** /**
* This pipeline stage can turn specific events into so-called 'control events'. It can either be * This pipeline stage can turn specific events into so-called 'control events'. It can either be
* configured to turn certain event types into control events, by setting the control events flag on * configured to turn certain event types into control events, by setting the control events flag on

View File

@ -51,11 +51,13 @@
namespace mongo { namespace mongo {
ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamOplogMatch, ChangeStreamOplogMatchStageParams::id);
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamOplogMatch, REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamOplogMatch,
LiteParsedDocumentSourceChangeStreamInternal::parse, ChangeStreamOplogMatchLiteParsed::parse,
DocumentSourceChangeStreamOplogMatch::createFromBson, DocumentSourceChangeStreamOplogMatch::createFromBson,
true); true);
ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamOplogMatch, ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamOplogMatch,
DocumentSourceChangeStreamOplogMatch::id) DocumentSourceChangeStreamOplogMatch::id)

View File

@ -35,6 +35,7 @@
#include "mongo/bson/timestamp.h" #include "mongo/bson/timestamp.h"
#include "mongo/db/exec/document_value/value.h" #include "mongo/db/exec/document_value/value.h"
#include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h" #include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/expression_context.h"
@ -48,6 +49,11 @@
#include <boost/smart_ptr/intrusive_ptr.hpp> #include <boost/smart_ptr/intrusive_ptr.hpp>
namespace mongo { namespace mongo {
DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamOplogMatch);
using ChangeStreamOplogMatchLiteParsed =
DocumentSourceChangeStreamLiteParsedInternal<ChangeStreamOplogMatchStageParams>;
/** /**
* A custom subclass of DocumentSourceMatch which is used to generate a $match stage to be applied * A custom subclass of DocumentSourceMatch which is used to generate a $match stage to be applied
* on the oplog. The stage requires itself to be the first stage in the pipeline. * on the oplog. The stage requires itself to be the first stage in the pipeline.

View File

@ -48,10 +48,13 @@ namespace {
const StringDataSet kFieldsToRemoveForQueryShapeSerialization = {"version", "supportedEvents"}; const StringDataSet kFieldsToRemoveForQueryShapeSerialization = {"version", "supportedEvents"};
} // namespace } // namespace
ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamTransform, ChangeStreamTransformStageParams::id);
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamTransform, REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamTransform,
LiteParsedDocumentSourceChangeStreamInternal::parse, ChangeStreamTransformLiteParsed::parse,
DocumentSourceChangeStreamTransform::createFromBson, DocumentSourceChangeStreamTransform::createFromBson,
true); true);
ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamTransform, DocumentSourceChangeStreamTransform::id) ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamTransform, DocumentSourceChangeStreamTransform::id)
boost::intrusive_ptr<DocumentSourceChangeStreamTransform> boost::intrusive_ptr<DocumentSourceChangeStreamTransform>

View File

@ -53,6 +53,10 @@
namespace mongo { namespace mongo {
DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamTransform);
using ChangeStreamTransformLiteParsed =
DocumentSourceChangeStreamLiteParsedInternal<ChangeStreamTransformStageParams>;
class DocumentSourceChangeStreamTransform final : public DocumentSourceInternalChangeStreamStage { class DocumentSourceChangeStreamTransform final : public DocumentSourceInternalChangeStreamStage {
public: public:
static constexpr StringData kStageName = "$_internalChangeStreamTransform"_sd; static constexpr StringData kStageName = "$_internalChangeStreamTransform"_sd;

View File

@ -54,10 +54,14 @@
namespace mongo { namespace mongo {
ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamUnwindTransaction,
ChangeStreamUnwindTransactionStageParams::id);
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamUnwindTransaction, REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamUnwindTransaction,
LiteParsedDocumentSourceChangeStreamInternal::parse, ChangeStreamUnwindTransactionLiteParsed::parse,
DocumentSourceChangeStreamUnwindTransaction::createFromBson, DocumentSourceChangeStreamUnwindTransaction::createFromBson,
true); true);
ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamUnwindTransaction, ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamUnwindTransaction,
DocumentSourceChangeStreamUnwindTransaction::id) DocumentSourceChangeStreamUnwindTransaction::id)

View File

@ -56,6 +56,10 @@
namespace mongo { namespace mongo {
DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamUnwindTransaction);
using ChangeStreamUnwindTransactionLiteParsed =
DocumentSourceChangeStreamLiteParsedInternal<ChangeStreamUnwindTransactionStageParams>;
/** /**
* This stage keeps track of applyOps oplog entries that represent transactions and "unwinds" them * This stage keeps track of applyOps oplog entries that represent transactions and "unwinds" them
* whenever an oplog entry commits a transaction. When the stage observes an applyOps or commit * whenever an oplog entry commits a transaction. When the stage observes an applyOps or commit