diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index fea41883467..132a3391dbd 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -128,7 +128,7 @@ public: kStageName, repl::ReadConcernLevel::kMajorityReadConcern, level, isImplicitDefault); } - std::unique_ptr getStageParams() const final { + std::unique_ptr getStageParams() const override { return std::make_unique(_originalBson); } @@ -421,19 +421,15 @@ private: * ensure that all the necessary authentication and input validation checks are applied while * parsing. */ -class LiteParsedDocumentSourceChangeStreamInternal final +class DocumentSourceChangeStreamLiteParsedInternalBase : public DocumentSourceChangeStream::LiteParsed { -public: - static std::unique_ptr parse( - const NamespaceString& nss, const BSONElement& spec, const LiteParserOptions& options) { - return std::make_unique(spec, nss); - } - - LiteParsedDocumentSourceChangeStreamInternal(const BSONElement& spec, NamespaceString nss) +protected: + DocumentSourceChangeStreamLiteParsedInternalBase(const BSONElement& spec, NamespaceString nss) : DocumentSourceChangeStream::LiteParsed(spec, std::move(nss)), _privileges({Privilege(ResourcePattern::forClusterResource(_nss.tenantId()), ActionType::internal)}) {} +public: PrivilegeVector requiredPrivileges(bool isMongos, bool bypassDocumentValidation) const final { return _privileges; } @@ -442,6 +438,24 @@ private: const PrivilegeVector _privileges; }; +template +class DocumentSourceChangeStreamLiteParsedInternal final + : public DocumentSourceChangeStreamLiteParsedInternalBase { +public: + DocumentSourceChangeStreamLiteParsedInternal(const BSONElement& originalBson, + NamespaceString nss) + : DocumentSourceChangeStreamLiteParsedInternalBase(originalBson, std::move(nss)) {} + + static std::unique_ptr parse( + NamespaceString nss, const BSONElement& spec, const LiteParserOptions& options) { + return std::make_unique(spec, std::move(nss)); + } + + std::unique_ptr getStageParams() const final { + return std::make_unique(_originalBson); + } +}; + /** * A DocumentSource class for all internal change stream stages. This class is useful for * shared logic between all of the internal change stream stages. For internally created match diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp index ae98169f2a0..bb5fc7a1ec5 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp @@ -41,10 +41,14 @@ namespace mongo { constexpr StringData DocumentSourceChangeStreamAddPostImage::kStageName; constexpr StringData DocumentSourceChangeStreamAddPostImage::kFullDocumentFieldName; +ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamAddPostImage, + ChangeStreamAddPostImageStageParams::id); + REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamAddPostImage, - LiteParsedDocumentSourceChangeStreamInternal::parse, + ChangeStreamAddPostImageLiteParsed::parse, DocumentSourceChangeStreamAddPostImage::createFromBson, true); + ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamAddPostImage, DocumentSourceChangeStreamAddPostImage::id) diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h index b95fa1753ce..5605a8a289e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h @@ -52,6 +52,10 @@ namespace mongo { +DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamAddPostImage); +using ChangeStreamAddPostImageLiteParsed = + DocumentSourceChangeStreamLiteParsedInternal; + /** * Part of the change stream API machinery used to look up the post-image of a document. Uses the * "documentKey" field of the input to look up the new version of the document. diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp index 757f9c3e1ad..132f028f004 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp @@ -51,10 +51,13 @@ namespace mongo { +ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamAddPreImage, ChangeStreamAddPreImageStageParams::id); + REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamAddPreImage, - LiteParsedDocumentSourceChangeStreamInternal::parse, + ChangeStreamAddPreImageLiteParsed::parse, DocumentSourceChangeStreamAddPreImage::createFromBson, true); + ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamAddPreImage, DocumentSourceChangeStreamAddPreImage::id) diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h index 3db8615ebf4..e6e6954e3d1 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h +++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h @@ -54,6 +54,10 @@ namespace mongo { +DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamAddPreImage); +using ChangeStreamAddPreImageLiteParsed = + DocumentSourceChangeStreamLiteParsedInternal; + /** * Part of the change stream API machinery used to look up the pre-image of a document. * diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp index 6a71e5676e2..e48dd3e6f19 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp @@ -45,10 +45,14 @@ namespace mongo { +ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamCheckInvalidate, + ChangeStreamCheckInvalidateStageParams::id); + REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckInvalidate, - LiteParsedDocumentSourceChangeStreamInternal::parse, + ChangeStreamCheckInvalidateLiteParsed::parse, DocumentSourceChangeStreamCheckInvalidate::createFromBson, true); + ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamCheckInvalidate, DocumentSourceChangeStreamCheckInvalidate::id) diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h index b3e5e30621c..29f9510d5c8 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h @@ -53,6 +53,10 @@ namespace mongo { +DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamCheckInvalidate); +using ChangeStreamCheckInvalidateLiteParsed = + DocumentSourceChangeStreamLiteParsedInternal; + /** * This stage is used internally for change stream notifications to artificially generate an * "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp index 6efc86d390f..e24d665c89c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp @@ -49,10 +49,14 @@ using boost::intrusive_ptr; namespace mongo { +ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamCheckResumability, + ChangeStreamCheckResumabilityStageParams::id); + REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckResumability, - LiteParsedDocumentSourceChangeStreamInternal::parse, + ChangeStreamCheckResumabilityLiteParsed::parse, DocumentSourceChangeStreamCheckResumability::createFromBson, true); + ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamCheckResumability, DocumentSourceChangeStreamCheckResumability::id) diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h index 0fd08ddc38a..7157a825496 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h @@ -50,6 +50,11 @@ #include namespace mongo { + +DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamCheckResumability); +using ChangeStreamCheckResumabilityLiteParsed = + DocumentSourceChangeStreamLiteParsedInternal; + /** * This stage checks whether or not the oplog has enough history to resume the stream, and consumes * all events up to the given resume point. It is deployed on all shards when resuming a stream on diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp index 605c9b85b30..7797ce5d61b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp @@ -40,10 +40,14 @@ namespace mongo { +ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamCheckTopologyChange, + ChangeStreamCheckTopologyChangeStageParams::id); + REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckTopologyChange, - LiteParsedDocumentSourceChangeStreamInternal::parse, + ChangeStreamCheckTopologyChangeLiteParsed::parse, DocumentSourceChangeStreamCheckTopologyChange::createFromBson, true); + ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamCheckTopologyChange, DocumentSourceChangeStreamCheckTopologyChange::id) diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h index cf0081b1ff8..6d9a6eeb3c5 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h @@ -48,6 +48,10 @@ namespace mongo { +DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamCheckTopologyChange); +using ChangeStreamCheckTopologyChangeLiteParsed = + DocumentSourceChangeStreamLiteParsedInternal; + /** * This stage detects change stream topology changes in the form of 'kNewShardDetectedOpType' events * and forwards them directly to the executor via an exception. Using an exception bypasses the rest diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp index 50fd5533a0f..c629c1f734b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp @@ -38,10 +38,14 @@ namespace mongo { +ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamHandleTopologyChange, + ChangeStreamHandleTopologyChangeStageParams::id); + REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamHandleTopologyChange, - LiteParsedDocumentSourceChangeStreamInternal::parse, + ChangeStreamHandleTopologyChangeLiteParsed::parse, DocumentSourceChangeStreamHandleTopologyChange::createFromBson, true); + ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamHandleTopologyChange, DocumentSourceChangeStreamHandleTopologyChange::id) diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h index de7eda7f314..fc87b2bc7cd 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h @@ -49,6 +49,10 @@ namespace mongo { +DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamHandleTopologyChange); +using ChangeStreamHandleTopologyChangeLiteParsed = + DocumentSourceChangeStreamLiteParsedInternal; + /** * An internal stage used as part of the change streams infrastructure to listen for an event * signaling that a new shard now has potentially matching data. For example, this stage will diff --git a/src/mongo/db/pipeline/document_source_change_stream_inject_control_events.cpp b/src/mongo/db/pipeline/document_source_change_stream_inject_control_events.cpp index 10a2d324967..c031b4691bc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_inject_control_events.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_inject_control_events.cpp @@ -48,8 +48,11 @@ using boost::intrusive_ptr; namespace mongo { +ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamInjectControlEvents, + ChangeStreamInjectControlEventsStageParams::id); + REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamInjectControlEvents, - LiteParsedDocumentSourceChangeStreamInternal::parse, + ChangeStreamInjectControlEventsLiteParsed::parse, DocumentSourceChangeStreamInjectControlEvents::createFromBson, true); diff --git a/src/mongo/db/pipeline/document_source_change_stream_inject_control_events.h b/src/mongo/db/pipeline/document_source_change_stream_inject_control_events.h index 6ed117b3810..2896e927040 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_inject_control_events.h +++ b/src/mongo/db/pipeline/document_source_change_stream_inject_control_events.h @@ -47,6 +47,11 @@ #include namespace mongo { + +DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamInjectControlEvents); +using ChangeStreamInjectControlEventsLiteParsed = + DocumentSourceChangeStreamLiteParsedInternal; + /** * This pipeline stage can turn specific events into so-called 'control events'. It can either be * configured to turn certain event types into control events, by setting the control events flag on diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp index dd0d97a7233..6f6a43b7074 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp @@ -51,11 +51,13 @@ namespace mongo { +ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamOplogMatch, ChangeStreamOplogMatchStageParams::id); REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamOplogMatch, - LiteParsedDocumentSourceChangeStreamInternal::parse, + ChangeStreamOplogMatchLiteParsed::parse, DocumentSourceChangeStreamOplogMatch::createFromBson, true); + ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamOplogMatch, DocumentSourceChangeStreamOplogMatch::id) diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h index d129d924f60..86e9323b114 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h @@ -35,6 +35,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_change_stream_gen.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/expression_context.h" @@ -48,6 +49,11 @@ #include namespace mongo { + +DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamOplogMatch); +using ChangeStreamOplogMatchLiteParsed = + DocumentSourceChangeStreamLiteParsedInternal; + /** * A custom subclass of DocumentSourceMatch which is used to generate a $match stage to be applied * on the oplog. The stage requires itself to be the first stage in the pipeline. diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index beb2a913edf..e556d29a093 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -48,10 +48,13 @@ namespace { const StringDataSet kFieldsToRemoveForQueryShapeSerialization = {"version", "supportedEvents"}; } // namespace +ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamTransform, ChangeStreamTransformStageParams::id); + REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamTransform, - LiteParsedDocumentSourceChangeStreamInternal::parse, + ChangeStreamTransformLiteParsed::parse, DocumentSourceChangeStreamTransform::createFromBson, true); + ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamTransform, DocumentSourceChangeStreamTransform::id) boost::intrusive_ptr diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index d234972645c..bdd3b389b06 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -53,6 +53,10 @@ namespace mongo { +DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamTransform); +using ChangeStreamTransformLiteParsed = + DocumentSourceChangeStreamLiteParsedInternal; + class DocumentSourceChangeStreamTransform final : public DocumentSourceInternalChangeStreamStage { public: static constexpr StringData kStageName = "$_internalChangeStreamTransform"_sd; diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp index f3ed5fa4236..c5a72fb3aa4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp @@ -54,10 +54,14 @@ namespace mongo { +ALLOCATE_STAGE_PARAMS_ID(_internalChangeStreamUnwindTransaction, + ChangeStreamUnwindTransactionStageParams::id); + REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamUnwindTransaction, - LiteParsedDocumentSourceChangeStreamInternal::parse, + ChangeStreamUnwindTransactionLiteParsed::parse, DocumentSourceChangeStreamUnwindTransaction::createFromBson, true); + ALLOCATE_DOCUMENT_SOURCE_ID(_internalChangeStreamUnwindTransaction, DocumentSourceChangeStreamUnwindTransaction::id) diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h index 20b977d0b71..f86562f957d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h @@ -56,6 +56,10 @@ namespace mongo { +DECLARE_STAGE_PARAMS_DERIVED_DEFAULT(ChangeStreamUnwindTransaction); +using ChangeStreamUnwindTransactionLiteParsed = + DocumentSourceChangeStreamLiteParsedInternal; + /** * This stage keeps track of applyOps oplog entries that represent transactions and "unwinds" them * whenever an oplog entry commits a transaction. When the stage observes an applyOps or commit