diff --git a/jstests/change_streams/versions.js b/jstests/change_streams/versions.js index d8dc449745a..7695af61fb7 100644 --- a/jstests/change_streams/versions.js +++ b/jstests/change_streams/versions.js @@ -12,8 +12,7 @@ import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js"; const dbName = jsTestName(); const collName = jsTestName(); -// TODO SERVER-107442: reenable testing of "v2" reader version here. -const validVersions = ["v1", undefined]; +const validVersions = ["v1", "v2", undefined]; const isPreciseShardTargetingEnabled = FeatureFlagUtil.isEnabled(db, "ChangeStreamPreciseShardTargeting"); function testChangeStreamWithVersionAttributeSet(version = undefined) { @@ -34,7 +33,7 @@ function testChangeStreamWithVersionAttributeSet(version = undefined) { // Database-level and all database-change streams are currently not implemented for v2 readers, so we // only add them to the test when it is safe to do so (non-v2 change stream reader and/or feature flag is // disabled). - if (version !== "v2" || !isPreciseShardTargetingEnabled) { + if (version !== "v2" && !isPreciseShardTargetingEnabled) { tests = tests.concat([ {collection: 1}, // Whole-DB change stream {}, // Whole-cluster change stream diff --git a/jstests/sharding/query/change_streams/change_stream_v2_smoke.js b/jstests/sharding/query/change_streams/change_stream_v2_smoke.js new file mode 100644 index 00000000000..27318477386 --- /dev/null +++ b/jstests/sharding/query/change_streams/change_stream_v2_smoke.js @@ -0,0 +1,264 @@ +/** + * Smoke tests for $changeStream v2 in a sharded cluster. + * Tested scenarios include: + * - Opening a change stream on an existing collection, capturing events before, during, and after resharding, until invalidation + * - Opening a change stream on a non-existent collection, waiting for the collection to be created before returning events + * - Opening a change stream in the future, waiting for the start time to be reached before returning events + * + * @tags: [ + * featureFlagChangeStreamPreciseShardTargeting, + * ] + */ +import {ShardingTest} from "jstests/libs/shardingtest.js"; +import {describe, it, before, afterEach, after, beforeEach} from "jstests/libs/mochalite.js"; +import {assertCreateCollection, assertDropCollection} from "jstests/libs/collection_drop_recreate.js"; +import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js"; + +describe("$changeStream v2", function () { + let st; + let db; + let coll; + let csTest; + + before(function () { + st = new ShardingTest({ + shards: 3, + mongos: 1, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}, + other: { + enableBalancer: false, + }, + }); + + db = st.s.getDB(jsTestName()); + coll = db.test; + + //Enable sharding on the the test database and ensure that the primary is shard0. + assert.commandWorked(db.adminCommand({enableSharding: db.getName(), primaryShard: st.shard0.shardName})); + }); + + afterEach(function () { + csTest.cleanUp(); + assertDropCollection(db, coll.getName()); + }); + + after(function () { + st.stop(); + }); + + function assertCollDataDistribution(expectedCounts) { + for (const [shardConn, expectedCount] of expectedCounts) { + assert.soon( + () => { + const docs = shardConn.getDB(db.getName())[coll.getName()].find().toArray(); + return expectedCount == docs.length; + }, + "Expected " + expectedCount + " documents on " + shardConn, + ); + } + } + + function distributeCollDataOverShards(coll, distributionConfig) { + assert.commandWorked( + st.s.adminCommand({ + split: coll.getFullName(), + middle: distributionConfig.middle, + }), + ); + for (const chunkConfig of distributionConfig.chunks) { + assert.commandWorked( + st.s.adminCommand({ + moveChunk: coll.getFullName(), + find: chunkConfig.find, + to: chunkConfig.to, + _waitForDelete: true, + }), + ); + } + assertCollDataDistribution(distributionConfig.expectedCounts); + } + + it("returns events before and after resharding until invalidation", function () { + // Create and shard a collection and allocate collection to shard set {shard0, shard1}. + assertCreateCollection(db, coll.getName()); + assert.commandWorked(db.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}})); + coll.insertMany([ + {_id: -1, a: -1}, + {_id: 1, a: 1}, + ]); + distributeCollDataOverShards(coll, { + middle: {_id: 0}, + chunks: [ + {find: {_id: -1}, to: st.shard0.shardName}, + {find: {_id: 1}, to: st.shard1.shardName}, + ], + expectedCounts: [ + [st.shard0, 1], + [st.shard1, 1], + [st.shard2, 0], + ], + }); + + // Open a change stream. + csTest = new ChangeStreamTest(db); + const csCursor = csTest.startWatchingChanges({pipeline: [{$changeStream: {version: "v2"}}], collection: coll}); + + // Insert documents into the collection and ensure data distribution. + coll.insertMany([ + {_id: 2, a: 2}, + {_id: 3, a: 3}, + ]); + assertCollDataDistribution([ + [st.shard0, 1], + [st.shard1, 3], + [st.shard2, 0], + ]); + + // Reshard collection and allocate to shards {shard1, shard2}. + assert.commandWorked( + st.s.adminCommand({reshardCollection: coll.getFullName(), key: {a: 1}, numInitialChunks: 1}), + ); + distributeCollDataOverShards(coll, { + middle: {a: 2}, + chunks: [ + {find: {a: 1}, to: st.shard1.shardName}, + {find: {a: 2}, to: st.shard2.shardName}, + ], + expectedCounts: [ + [st.shard0, 0], + [st.shard1, 2], + [st.shard2, 2], + ], + }); + + // Insert documents into the collection and ensure data distribution. + coll.insertMany([ + {_id: 4, a: 4}, + {_id: 5, a: 5}, + ]); + assertCollDataDistribution([ + [st.shard0, 0], + [st.shard1, 2], + [st.shard2, 4], + ]); + + // Drop the collection. + assertDropCollection(db, coll.getName()); + + // Read events until invalidation. + csTest.assertNextChangesEqual({ + cursor: csCursor, + expectedChanges: [ + { + documentKey: {_id: 2}, + fullDocument: {_id: 2, a: 2}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 3}, + fullDocument: {_id: 3, a: 3}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 4, a: 4}, + fullDocument: {_id: 4, a: 4}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 5, a: 5}, + fullDocument: {_id: 5, a: 5}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + }, + { + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "drop", + }, + { + operationType: "invalidate", + }, + ], + }); + }); + + it("can be opened on non-existing collection and returns events once it is created", function () { + // Open the change stream just after the invalidation cluster time. + csTest = new ChangeStreamTest(db); + const csCursor = csTest.startWatchingChanges({ + pipeline: [{$changeStream: {version: "v2", showExpandedEvents: true}}], + collection: coll, + }); + + // Create an unsplittable collection. + assert.commandWorked(db.runCommand({createUnsplittableCollection: "test", dataShard: st.shard1.shardName})); + + // Insert some documents. + coll.insertMany([ + {_id: 1, a: 1}, + {_id: 2, a: 2}, + ]); + + // Read events. + csTest.assertNextChangesEqual({ + cursor: csCursor, + expectedChanges: [ + { + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "create", + }, + { + documentKey: {_id: 1}, + fullDocument: {_id: 1, a: 1}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 2}, + fullDocument: {_id: 2, a: 2}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + }, + ], + }); + }); + + it("returns events after start time when opened in the future", function () { + // Create collection. + assertCreateCollection(db, coll.getName()); + + // Open a change stream on 'coll' 3 seconds in the future. + const testStartTime = db.adminCommand({hello: 1}).$clusterTime.clusterTime; + testStartTime.t += 3; + csTest = new ChangeStreamTest(db); + let csCursor = csTest.startWatchingChanges({ + pipeline: [{$changeStream: {version: "v2", startAtOperationTime: testStartTime}}], + collection: coll, + }); + + // Expect the cursor to not return any events, yet to not be closed. + csCursor = csTest.assertNoChange(csCursor); + assert.neq(csCursor.id, 0, "cursor was closed unexpectedly"); + + // Wait until we are past the start time. + sleep(5000); + + // Insert a document in collection. + assert.commandWorked(coll.insert({_id: 1, a: 1})); + + // Read event. + csTest.assertNextChangesEqual({ + cursor: csCursor, + expectedChanges: [ + { + documentKey: {_id: 1}, + fullDocument: {_id: 1, a: 1}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + }, + ], + }); + }); +}); diff --git a/src/mongo/db/exec/agg/change_stream_handle_topology_change_v2_stage.cpp b/src/mongo/db/exec/agg/change_stream_handle_topology_change_v2_stage.cpp index 2c46ec7da09..b1ec35edd71 100644 --- a/src/mongo/db/exec/agg/change_stream_handle_topology_change_v2_stage.cpp +++ b/src/mongo/db/exec/agg/change_stream_handle_topology_change_v2_stage.cpp @@ -192,7 +192,9 @@ public: void closeCursorsOnDataShards(const stdx::unordered_set& shardIds) override { closeCursors(shardIds, false /* isConfigServer */); - _currentlyTargetedDataShards.erase(shardIds.begin(), shardIds.end()); + for (const auto& shardId : shardIds) { + _currentlyTargetedDataShards.erase(shardId); + } } void closeCursorOnConfigServer(OperationContext* opCtx) override { diff --git a/src/mongo/db/pipeline/BUILD.bazel b/src/mongo/db/pipeline/BUILD.bazel index 20b28e116cb..e5fa7f25d2d 100644 --- a/src/mongo/db/pipeline/BUILD.bazel +++ b/src/mongo/db/pipeline/BUILD.bazel @@ -1590,6 +1590,7 @@ mongo_cc_unit_test( "sequential_document_cache_test.cpp", "serverless_aggregation_context_fixture.cpp", "serverless_aggregation_context_fixture.h", + "sharded_agg_helpers_test.cpp", "sharded_union_test.cpp", "skip_and_limit_test.cpp", "tee_buffer_test.cpp", diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp index a648c411aba..3bff8cc7dfe 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp @@ -51,6 +51,7 @@ #include "mongo/util/namespace_string_util.h" #include "mongo/util/str.h" +#include #include #include #include @@ -315,6 +316,8 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum // can be overriden for specific event types below. bool requireUUID = true; + bool shouldAddOperationDescriptionField = _changeStreamSpec.getShowExpandedEvents(); + MutableDocument doc; switch (opType) { @@ -503,10 +506,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum // Check for dynamic events that were specified via the 'supportedEvents' change stream // parameter. // This also checks for some hard-coded sharding-related events. - if (auto result = handleSupportedEvent(o2Field)) { + if (const auto& result = handleSupportedEvent(o2Field)) { // Apply returned event name and operationDescription. - operationType = result->first; - operationDescription = result->second; + operationType = result->opType; + operationDescription = result->opDescription; + shouldAddOperationDescriptionField |= !result->isBuiltInEvent; // Check if the 'reshardingUUID' field needs to be added to the event. if (kOpsWithReshardingUUIDs.contains(operationType)) { @@ -600,7 +604,10 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum // The event may have a documentKey OR an operationDescription, but not both. We already // validated this while creating the resume token. doc.addField(DocumentSourceChangeStream::kDocumentKeyField, std::move(documentKey)); - if (_changeStreamSpec.getShowExpandedEvents() && !operationDescription.missing()) { + + // Control events must be emitted with the corresponding 'operationDescription' field, + // regardless of change stream being opened in 'showExpandedEvents' mode or not. + if (shouldAddOperationDescriptionField && !operationDescription.missing()) { doc.addField(DocumentSourceChangeStream::kOperationDescriptionField, std::move(operationDescription)); } @@ -626,13 +633,18 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum return doc.freeze(); } -boost::optional> +boost::optional ChangeStreamDefaultEventTransformation::handleSupportedEvent(const Document& o2Field) const { for (auto&& supportedEvent : _supportedEvents) { if (auto lookup = o2Field[supportedEvent]; !lookup.missing()) { // Known event. - return std::make_pair(supportedEvent, - Value{copyDocExceptFields(o2Field, {supportedEvent})}); + const bool isBuiltInEvent = + std::find(kBuiltInNoopEvents.begin(), kBuiltInNoopEvents.end(), supportedEvent) != + kBuiltInNoopEvents.end(); + return ChangeStreamDefaultEventTransformation::SupportedEventResult{ + supportedEvent, + Value{copyDocExceptFields(o2Field, {supportedEvent})}, + isBuiltInEvent}; } } return boost::none; diff --git a/src/mongo/db/pipeline/change_stream_event_transform.h b/src/mongo/db/pipeline/change_stream_event_transform.h index 7f975610896..89550cddc3a 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.h +++ b/src/mongo/db/pipeline/change_stream_event_transform.h @@ -90,6 +90,12 @@ protected: * The event builder class to be used for oplog entries with no special behavior. */ class ChangeStreamDefaultEventTransformation final : public ChangeStreamEventTransformation { + struct SupportedEventResult { + StringData opType; + Value opDescription; + bool isBuiltInEvent; + }; + public: ChangeStreamDefaultEventTransformation(const boost::intrusive_ptr& expCtx, const DocumentSourceChangeStreamSpec& spec); @@ -104,11 +110,11 @@ public: private: /** * Checks the 'o2Field' value of an oplog entry has any field name that is contained in - * '_supportedEvents'. If so, it returns the name of the field and the value mapped to the field - * in the oplog entry. Otherwise returns 'boost::none'. + * '_supportedEvents'. If so, it returns the name of the field, the value mapped to the field + * in the oplog entry, as well as whether the event is a builtin event. Otherwise returns + * 'boost::none'. */ - boost::optional> handleSupportedEvent( - const Document& o2Field) const; + boost::optional handleSupportedEvent(const Document& o2Field) const; /** * Build the '_supportedEvents' container from the 'supportedEvents' change stream parameter. diff --git a/src/mongo/db/pipeline/change_stream_event_transform_test.cpp b/src/mongo/db/pipeline/change_stream_event_transform_test.cpp index 09f108f8a74..baecf9d0e90 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform_test.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform_test.cpp @@ -62,10 +62,32 @@ namespace mongo { namespace { using namespace change_stream_test_helper; -Document applyTransformation(const repl::OplogEntry& oplogEntry, NamespaceString ns = nss) { +repl::MutableOplogEntry buildMovePrimaryOplogEntry(OperationContext* opCtx, + const DatabaseName& dbName, + const ShardId& oldPrimary, + const ShardId& newPrimary) { + repl::MutableOplogEntry oplogEntry; + const auto dbNameStr = + DatabaseNameUtil::serialize(dbName, SerializationContext::stateDefault()); + + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(NamespaceString(dbName)); + oplogEntry.setObject(BSON("msg" << BSON("movePrimary" << dbNameStr))); + oplogEntry.setObject2( + BSON("movePrimary" << dbNameStr << "from" << oldPrimary << "to" << newPrimary)); + oplogEntry.setOpTime(repl::OpTime(kDefaultTs, 0)); + oplogEntry.setWallClockTime(Date_t()); + + return oplogEntry; +} + +Document applyTransformation(const repl::OplogEntry& oplogEntry, + NamespaceString ns = nss, + const std::vector& supportedEvents = {}) { const auto oplogDoc = Document(oplogEntry.getEntry().toBSON()); DocumentSourceChangeStreamSpec spec; spec.setStartAtOperationTime(kDefaultTs); + spec.setSupportedEvents(supportedEvents); spec.setShowExpandedEvents(true); ChangeStreamEventTransformer transformer(make_intrusive(ns), spec); @@ -283,5 +305,68 @@ TEST(ChangeStreamEventTransformTest, TestCreateViewOnSingleCollection) { ASSERT_DOCUMENT_EQ(applyTransformation(oplogEntry), expectedDoc); } +TEST(ChangeStreamEventTransformTest, + Given_NoopOplogEntry_When_CallingTransform_Then_FieldsAreNotCopied) { + const NamespaceString nss = + NamespaceString::createNamespaceString_forTest(boost::none, "testDB.coll.name"); + + // Create a noop oplog entry that represents a 'shardCollection' event. + repl::MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(nss); + oplogEntry.setObject(BSON("msg" << BSON("shardCollection" << nss.toString_forTest()))); + oplogEntry.setObject2(BSON("shardCollection" << nss.toString_forTest() << "key" + << BSON("x" << 1) << "unique" << false)); + oplogEntry.setOpTime(repl::OpTime(kDefaultTs, 0)); + oplogEntry.setWallClockTime(Date_t()); + + // Expect fields from the oplog entry to be present in the 'operationDescription' field. + Document expectedDoc{ + {DocumentSourceChangeStream::kIdField, + makeResumeToken(kDefaultTs, + Value(), + Document{{"key", Document{{"x", 1}}}, {"unique", false}}, + DocumentSourceChangeStream::kShardCollectionOpType)}, + {DocumentSourceChangeStream::kOperationTypeField, + DocumentSourceChangeStream::kShardCollectionOpType}, + {DocumentSourceChangeStream::kClusterTimeField, kDefaultTs}, + {DocumentSourceChangeStream::kWallTimeField, Date_t()}, + {DocumentSourceChangeStream::kNamespaceField, + Document{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}, + {DocumentSourceChangeStream::kOperationDescriptionField, + Document{{"key", Document{{"x", 1}}}, {"unique", false}}}, + }; + + repl::OplogEntry immutableEntry(oplogEntry.toBSON()); + ASSERT_DOCUMENT_EQ(applyTransformation(immutableEntry, nss), expectedDoc); +} + +TEST( + ChangeStreamEventTransformTest, + Given_NoopOplogEntryWhichIsNotBuiltIn_When_CallingTransform_Then_OperationDescriptionIsPresent) { + const NamespaceString nss = + NamespaceString::createNamespaceString_forTest(boost::none, "testDB.coll.name"); + auto serviceContext = std::make_unique(); + auto opCtx = serviceContext->makeOperationContext(); + auto oplogEntry = buildMovePrimaryOplogEntry( + opCtx.get(), nss.dbName(), ShardId("oldPrimary"), ShardId("newPrimary")); + auto opDescription = Document{{ + {"from"_sd, "oldPrimary"_sd}, + {"to"_sd, "newPrimary"_sd}, + }}; + + Document expectedDoc{ + {DocumentSourceChangeStream::kIdField, + makeResumeToken(kDefaultTs, Value(), opDescription, "movePrimary")}, + {DocumentSourceChangeStream::kOperationTypeField, "movePrimary"_sd}, + {DocumentSourceChangeStream::kClusterTimeField, kDefaultTs}, + {DocumentSourceChangeStream::kWallTimeField, Date_t()}, + {DocumentSourceChangeStream::kNamespaceField, Document{{"db", nss.db_forTest()}}}, + {DocumentSourceChangeStream::kOperationDescriptionField, opDescription}}; + + repl::OplogEntry immutableEntry(oplogEntry.toBSON()); + ASSERT_DOCUMENT_EQ(applyTransformation(immutableEntry, nss, {"movePrimary"}), expectedDoc); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp index dfaa90e707f..b5cdf4d881f 100644 --- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp @@ -441,7 +441,10 @@ std::unique_ptr buildInternalOpFilter( // Noop change events that are only applicable when merging results on router: // - migrateChunkToNewShard: A chunk migrated to a shard that didn't have any chunks. - if (expCtx->getInRouter() || expCtx->getNeedsMerge()) { + // Do not emit 'migrateChunkToNewShard' event for change streams version 2, as it is not needed + // for handling topology changes. + // TODO: SERVER-111727 Stop emitting migrateChunkToNewShard change event. + if (!expCtx->isChangeStreamV2() && (expCtx->getInRouter() || expCtx->getNeedsMerge())) { internalOpTypes.push_back("migrateChunkToNewShard"_sd); } diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index de8f3b36728..005599aa636 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -384,7 +384,7 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( // We can only run on a replica set, or through mongoS. Confirm that this is the case. auto replCoord = repl::ReplicationCoordinator::get(expCtx->getOperationContext()); uassert(40573, - "The $changeStream stage is only supported on replica sets", + "The $changeStream stage is only supported on replica sets or mongos", expCtx->getInRouter() || (replCoord && replCoord->getSettings().isReplSet())); // We will not validate user specified options when we are not expecting to execute queries, diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 39a16c8217b..b92fa7ccfbc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -3312,7 +3312,8 @@ TEST_F(ChangeStreamStageTest, DocumentSourceChangeStreamTransformTransformSingle {DSChangeStream::kOperationTypeField, "eventType1"_sd}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}}; + {DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}, + {DSChangeStream::kOperationDescriptionField, Document{operationDescription}}}; auto stage = exec::agg::MockStage::createForTest({Document{entry.getEntry().toBSON()}}, getExpCtx()); @@ -3371,7 +3372,8 @@ TEST_F(ChangeStreamStageTest, DocumentSourceChangeStreamTransformTransformMultip {DSChangeStream::kOperationTypeField, "eventType1"_sd}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}}; + {DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}, + {DSChangeStream::kOperationDescriptionField, Document{operationDescriptionEvent1}}}; Document expectedDoc2{ {DSChangeStream::kIdField, @@ -3379,7 +3381,8 @@ TEST_F(ChangeStreamStageTest, DocumentSourceChangeStreamTransformTransformMultip {DSChangeStream::kOperationTypeField, "eventType2"_sd}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}}; + {DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}, + {DSChangeStream::kOperationDescriptionField, Document{operationDescriptionEvent2}}}; std::deque docs; docs.push_back(Document{entry1.getEntry().toBSON()}); diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index a936796db0f..6c88486dc5f 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -800,6 +800,14 @@ public: _params.changeStreamSpec = std::move(changeStreamSpec); } + bool isChangeStreamV2() const { + if (const auto& spec = _params.changeStreamSpec) { + return spec->getVersion() == ChangeStreamReaderVersionEnum::kV2; + } + + return false; + } + const BSONObj& getOriginalAggregateCommand() const { return _params.originalAggregateCommand; } diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 0ab9de8ef2e..073844eb0cc 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -168,38 +168,6 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v return explainCommandBuilder.freeze(); } -/** - * Open a $changeStream cursor on the 'config.shards' collection to watch for new shards. - */ -RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr& expCtx, - Timestamp startMonitoringAtTime) { - const auto& configShard = - Grid::get(expCtx->getOperationContext())->shardRegistry()->getConfigShard(); - // Pipeline: {$changeStream: {startAtOperationTime: [now], allowToRunOnConfigDB: true}} - AggregateCommandRequest aggReq( - NamespaceString::kConfigsvrShardsNamespace, - {BSON(DocumentSourceChangeStream::kStageName - << BSON(DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName - << startMonitoringAtTime - << DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))}); - aggregation_request_helper::setFromRouter( - VersionContext::getDecoration(expCtx->getOperationContext()), aggReq, true); - aggReq.setNeedsMerge(true); - - SimpleCursorOptions cursor; - cursor.setBatchSize(0); - aggReq.setCursor(cursor); - setReadWriteConcern(expCtx->getOperationContext(), aggReq, true, !expCtx->getExplain()); - auto configCursor = establishCursors(expCtx->getOperationContext(), - expCtx->getMongoProcessInterface()->taskExecutor, - aggReq.getNamespace(), - ReadPreferenceSetting{ReadPreference::SecondaryPreferred}, - {{configShard->getId(), aggReq.toBSON()}}, - false); - invariant(configCursor.size() == 1); - return std::move(*configCursor.begin()); -} - BSONObj genericTransformForShards(MutableDocument&& cmdForShards, const boost::intrusive_ptr& expCtx, boost::optional explainVerbosity, @@ -330,38 +298,6 @@ std::vector establishShardCursors( } } -std::set getTargetedShards(boost::intrusive_ptr expCtx, - PipelineDataSource pipelineDataSource, - bool mustRunOnAllShards, - const boost::optional& cri, - const BSONObj shardQuery, - const BSONObj collation, - const boost::optional& mergeShardId) { - if (mustRunOnAllShards) { - // The pipeline begins with a stage which must be run on all shards. - auto shardIds = Grid::get(expCtx->getOperationContext()) - ->shardRegistry() - ->getAllShardIds(expCtx->getOperationContext()); - return {std::make_move_iterator(shardIds.begin()), std::make_move_iterator(shardIds.end())}; - } else if (pipelineDataSource == PipelineDataSource::kGeneratesOwnDataOnce) { - if (mergeShardId) { - return {*mergeShardId}; - } - - // The output collection is nonexistent or sharded, so we cannot determine a single - // mergeShardId. Designate the dbPrimary shard to run the shards part. - const auto dbInfo = - uassertStatusOK(Grid::get(expCtx->getOperationContext()) - ->catalogCache() - ->getDatabase(expCtx->getOperationContext(), - expCtx->getNamespaceString().dbName())); - return {dbInfo->getPrimary()}; - } - - tassert(8361100, "Need CollectionRoutingInfo to target sharded query", cri); - return getTargetedShardsForQuery(expCtx, *cri, shardQuery, collation); -} - bool stageCanRunInParallel(const boost::intrusive_ptr& stage, const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) { if (stage->distributedPlanLogic()) { @@ -684,8 +620,63 @@ std::unique_ptr tryAttachCursorSourceForLocalRead( } return nullptr; } + +std::set getTargetedShardsForAllShardsRequest(OperationContext* opCtx) { + auto shardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); + return {std::make_move_iterator(shardIds.begin()), std::make_move_iterator(shardIds.end())}; +} + +std::set getTargetedShardsForPipelineGeteratingOwnDataOnce( + const boost::intrusive_ptr& expCtx, boost::optional mergeShardId) { + if (mergeShardId) { + return std::set{*mergeShardId}; + } + + // The output collection is nonexistent or sharded, so we cannot determine a + // single mergeShardId. Designate the dbPrimary shard to run the shards part. + auto* opCtx = expCtx->getOperationContext(); + const auto dbInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase( + opCtx, expCtx->getNamespaceString().dbName())); + return std::set{dbInfo->getPrimary()}; +} + +std::set getTargetedShardsForChangeStream( + const boost::intrusive_ptr& expCtx) { + // If we are running change stream of version v2, the shard targeting will be determined via the + // ChangeStreamHandleTopologyChangeV2, therefore we return an empty shard set. + if (expCtx->isChangeStreamV2()) { + return {}; + } + + return getTargetedShardsForAllShardsRequest(expCtx->getOperationContext()); +} } // namespace +std::set getTargetedShards(boost::intrusive_ptr expCtx, + PipelineDataSource pipelineDataSource, + const boost::optional& cri, + const BSONObj& shardQuery, + const BSONObj& collation, + const boost::optional& mergeShardId) { + switch (pipelineDataSource) { + case PipelineDataSource::kGeneratesOwnDataOnce: + return getTargetedShardsForPipelineGeteratingOwnDataOnce(expCtx, mergeShardId); + + case PipelineDataSource::kChangeStream: + return getTargetedShardsForChangeStream(expCtx); + + case PipelineDataSource::kNormal: + if (expCtx->getNamespaceString().isCollectionlessAggregateNS()) { + return getTargetedShardsForAllShardsRequest(expCtx->getOperationContext()); + } + + tassert(8361100, "Need CollectionRoutingInfo to target sharded query", cri); + return getTargetedShardsForQuery(expCtx, *cri, shardQuery, collation); + } + + MONGO_UNREACHABLE_TASSERT(10744200); +} + std::unique_ptr runPipelineDirectlyOnSingleShard( const boost::intrusive_ptr& expCtx, AggregateCommandRequest request, @@ -938,18 +929,13 @@ TargetingResults targetPipeline(const boost::intrusive_ptr& e boost::optional mergeShardId = pipeline->needsSpecificShardMerger(); const bool mustRunOnAllShards = checkIfMustRunOnAllShards(expCtx->getNamespaceString(), pipelineDataSource); - std::set shardIds = getTargetedShards(expCtx, - pipelineDataSource, - mustRunOnAllShards, - cri, - shardQuery, - shardTargetingCollation, - mergeShardId); + std::set shardIds = getTargetedShards( + expCtx, pipelineDataSource, cri, shardQuery, shardTargetingCollation, mergeShardId); - bool targetAllHosts = pipeline->needsAllShardHosts(); // Don't need to split the pipeline if we are only targeting a single shard, unless: // - The pipeline contains one or more stages which must always merge on router. // - The pipeline requires the merge to be performed on a specific shard that is not targeted. + const bool targetAllHosts = pipeline->needsAllShardHosts(); const bool needsSplit = (shardIds.size() > 1u) || needsRouterMerge || targetAllHosts || (mergeShardId && *(shardIds.begin()) != mergeShardId); @@ -975,14 +961,10 @@ TargetingResults targetPipeline(const boost::intrusive_ptr& e Grid::get(expCtx->getOperationContext()) ->shardRegistry() ->reload(expCtx->getOperationContext()); + // Rebuild the set of shards as the shard registry might have changed. - shardIds = getTargetedShards(expCtx, - pipelineDataSource, - mustRunOnAllShards, - cri, - shardQuery, - shardTargetingCollation, - mergeShardId); + shardIds = getTargetedShards( + expCtx, pipelineDataSource, cri, shardQuery, shardTargetingCollation, mergeShardId); // Check that no shard has been removed since the change stream open time to detect a // possible event loss. It is important to execute it after retrieving the most recent list @@ -1180,6 +1162,25 @@ DispatchShardPipelineResults dispatchTargetedShardPipeline( std::move(readConcern), boost::none, requestQueryStatsFromRemotes)); + + // Shard targeting for change streams v2 is performed in ChangeStreamHandleTopologyChangeV2 + // stage. Here we early exit with empty DispatchShardPipelineResults. + if (expCtx->isChangeStreamV2()) { + tassert(10744202, + "set of targeted shards should be empty, as shard targeting is handled in " + "ChangeStreamHandleTopologyChangeV2 stage", + shardCount == 0); + + return DispatchShardPipelineResults{std::move(mergeShardId), + {}, + {}, + std::move(splitPipelines), + std::move(pipeline), + targetedCommand, + shardCount, + exchangeSpec}; + } + // If there were no shards when we began execution, we wouldn't have run this aggregation in the // first place. Here, we double-check that the shards have not been removed mid-operation. uassert(ErrorCodes::ShardNotFound, @@ -1234,11 +1235,11 @@ DispatchShardPipelineResults dispatchTargetedShardPipeline( << ") and we were not targeting each mongod in each shard", targetAllHosts || cursors.size() % shardCount == 0); - // For $changeStream, we must open an extra cursor on the 'config.shards' collection, so + // For $changeStream v1, we must open an extra cursor on the 'config.shards' collection, so // that we can monitor for the addition of new shards inline with real events. - if (pipelineDataSource == PipelineDataSource::kChangeStream && - !expCtx->getNamespaceString().isEqualDb(NamespaceString::kConfigsvrShardsNamespace)) { - cursors.emplace_back(openChangeStreamNewShardMonitor(expCtx, shardRegistryReloadTime)); + if (auto&& cursor = openChangeStreamCursorOnConfigsvrIfNeeded( + expCtx, pipelineDataSource, shardRegistryReloadTime)) { + cursors.emplace_back(std::move(*cursor)); } } @@ -1291,8 +1292,11 @@ DispatchShardPipelineResults dispatchShardPipeline( shardIds.erase(shard); } - // Return if we don't need to establish any cursors. - if (shardIds.empty()) { + // Early exit if we don't need to establish any cursors, unless it's change stream v2. + // Shard targeting for change stream v2 is not determined here, but by + // ChangeStreamHandleTopologyChangeV2 stage. In order to ensure this we need to dispatch + // the merge pipeline on mongos with $mergeCursors stage. + if (shardIds.empty() && !expCtx->isChangeStreamV2()) { tassert(7958303, "Expected no merge shard id when shardIds are empty", !targeting.mergeShardId.has_value()); @@ -1938,5 +1942,54 @@ std::unique_ptr finalizeAndMaybePreparePipelineForExecution( }); } +boost::optional openChangeStreamCursorOnConfigsvrIfNeeded( + const boost::intrusive_ptr& expCtx, + PipelineDataSource pipelineDataSource, + Timestamp startMonitoringAtTime) { + // Do not open change stream cursors on configsvr for non change stream pipelines. + if (pipelineDataSource != PipelineDataSource::kChangeStream) { + return {}; + } + + // Do not open change stream on configsvr for v2 change streams as shard targeting will be + // determined via the ChangeStreamHandleTopologyChangeV2 stage. + if (expCtx->isChangeStreamV2()) { + return {}; + } + + // Do not open change stream on configsvr if we are running a change stream on configsvr already + // over 'config.shards' collection. + if (expCtx->getNamespaceString().isEqualDb(NamespaceString::kConfigsvrShardsNamespace)) { + return {}; + } + + const auto& configShard = + Grid::get(expCtx->getOperationContext())->shardRegistry()->getConfigShard(); + + // Pipeline: {$changeStream: {startAtOperationTime: [now], allowToRunOnConfigDB: true}} + AggregateCommandRequest aggReq( + NamespaceString::kConfigsvrShardsNamespace, + {BSON(DocumentSourceChangeStream::kStageName + << BSON(DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName + << startMonitoringAtTime + << DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))}); + aggregation_request_helper::setFromRouter( + VersionContext::getDecoration(expCtx->getOperationContext()), aggReq, true); + aggReq.setNeedsMerge(true); + + SimpleCursorOptions cursor; + cursor.setBatchSize(0); + aggReq.setCursor(cursor); + setReadWriteConcern(expCtx->getOperationContext(), aggReq, true, !expCtx->getExplain()); + auto configCursor = establishCursors(expCtx->getOperationContext(), + expCtx->getMongoProcessInterface()->taskExecutor, + aggReq.getNamespace(), + ReadPreferenceSetting{ReadPreference::SecondaryPreferred}, + {{configShard->getId(), aggReq.toBSON()}}, + false); + tassert(10744201, "A single cursor over configsvr should be opened", configCursor.size() == 1); + return std::move(*configCursor.begin()); +} + } // namespace sharded_agg_helpers } // namespace mongo diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 8504a0843ff..cf45513d821 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -189,6 +189,15 @@ void partitionAndAddMergeCursorsSource(Pipeline* pipeline, */ BSONObj targetShardsForExplain(Pipeline* ownedPipeline); +/** + * Returns a set of targeted shards responsible for answering the 'shardQuery'. + */ +std::set getTargetedShards(boost::intrusive_ptr expCtx, + PipelineDataSource pipelineDataSource, + const boost::optional& cri, + const BSONObj& shardQuery, + const BSONObj& collation, + const boost::optional& mergeShardId); void mergeExplainOutputFromShards(const std::vector& shardResponses, BSONObjBuilder* result); @@ -292,5 +301,16 @@ std::unique_ptr runPipelineDirectlyOnSingleShard( ShardId shardId, bool requestQueryStatsFromRemotes); +/** + * Opens a $changeStream cursor on the 'config.shards' collection to watch for new shards if: + * - 'pipelineDataSource' is kChangeStream + * - change stream is not of version v2 + * - change stream is not already running over 'config.shards' collection. + */ +boost::optional openChangeStreamCursorOnConfigsvrIfNeeded( + const boost::intrusive_ptr& expCtx, + PipelineDataSource pipelineDataSource, + Timestamp startMonitoringAtTime); + } // namespace sharded_agg_helpers } // namespace mongo diff --git a/src/mongo/db/pipeline/sharded_agg_helpers_test.cpp b/src/mongo/db/pipeline/sharded_agg_helpers_test.cpp new file mode 100644 index 00000000000..414dcec6bdb --- /dev/null +++ b/src/mongo/db/pipeline/sharded_agg_helpers_test.cpp @@ -0,0 +1,250 @@ +/** + * Copyright (C) 2025-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/sharded_agg_helpers.h" + +#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/sharding_environment/shard_id.h" +#include "mongo/s/query/exec/sharded_agg_test_fixture.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" + +#include +#include +#include +#include + +namespace mongo::sharded_agg_helpers { +namespace { + +class ShardedAggHelpersFixture : public ShardedAggTestFixture { +public: + std::set setupShards(size_t n) { + std::set allShardIds; + for (auto&& shard : setupNShards(n)) { + allShardIds.insert(shard.getName()); + } + return allShardIds; + } + + void setChangeStreamVersionToExpCtx(ChangeStreamReaderVersionEnum version) { + auto&& spec = expCtx()->getChangeStreamSpec().value_or(DocumentSourceChangeStreamSpec()); + spec.setVersion(version); + expCtx()->setChangeStreamSpec(spec); + } +}; + +TEST_F( + ShardedAggHelpersFixture, + Given_GeneratesOwnDataOnceAndMergeShardId_When_CallingGetTargetedShards_Then_ReturnsMergeShardId) { + for (auto&& shardId : setupShards(3)) { + ASSERT_EQ(getTargetedShards(expCtx(), + PipelineDataSource::kGeneratesOwnDataOnce, + boost::none /* cri */, + BSONObj(), + BSONObj(), + shardId), + std::set{shardId}); + } +} + +TEST_F( + ShardedAggHelpersFixture, + Given_GeneratesOwnDataOnceAndNoMergeShardId_When_CallingGetTargetedShards_Then_ReturnsShardIdFromTheGetDatabaseCall) { + auto allShardIds = setupShards(3); + ShardId shardId("1"); + + // Mock ShardId returned by the GetDatabase call for the given namespace. + auto future = launchAsync([&] { expectGetDatabase(kTestAggregateNss, shardId.toString()); }); + + ASSERT_EQ(getTargetedShards(expCtx(), + PipelineDataSource::kGeneratesOwnDataOnce, + boost::none /* cri */, + BSONObj(), + BSONObj(), + boost::none /* mergeShardId */), + std::set{shardId}); + + future.default_timed_get(); +} + +TEST_F(ShardedAggHelpersFixture, + Given_ChangeStreamV1_WhenCallingGetTargetedShards_Then_ReturnsAllShards) { + std::set allShardIds = setupShards(3); + setChangeStreamVersionToExpCtx(ChangeStreamReaderVersionEnum::kV1); + ASSERT_EQ(getTargetedShards(expCtx(), + PipelineDataSource::kChangeStream, + boost::none /* cri */, + BSONObj(), + BSONObj(), + boost::none /* mergeShardId */), + allShardIds); +} + +TEST_F(ShardedAggHelpersFixture, + Given_ChangeStreamV2_WhenCallingGetTargetedShards_Then_ReturnsNoShards) { + std::set allShardIds = setupShards(3); + setChangeStreamVersionToExpCtx(ChangeStreamReaderVersionEnum::kV2); + ASSERT_EQ(getTargetedShards(expCtx(), + PipelineDataSource::kChangeStream, + boost::none /* cri */, + BSONObj(), + BSONObj(), + boost::none /* mergeShardId */), + std::set{}); +} + +TEST_F( + ShardedAggHelpersFixture, + Given_NormalDataSourceAndCollectionlessNss_When_CallingGetTargetedShards_Then_ReturnsAllShards) { + std::set allShardIds = setupShards(3); + auto nss = NamespaceString::makeCollectionlessAggregateNSS(DatabaseName::kMdbTesting); + expCtx()->setNamespaceString(nss); + ASSERT_EQ(getTargetedShards(expCtx(), + PipelineDataSource::kNormal, + boost::none /* cri */, + BSONObj(), + BSONObj(), + boost::none /* mergeShardId */), + allShardIds); +} + +TEST_F( + ShardedAggHelpersFixture, + Given_NormalDataSourceAndUnshardedCollection_When_CallingGetTargetedShards_Then_ReturnsDbPrimaryShard) { + std::set allShardIds = setupShards(3); + + expCtx()->setNamespaceString(kTestAggregateNss); + auto cri = makeUnshardedCollectionRoutingInfo(kTestAggregateNss); + ASSERT_EQ(getTargetedShards(expCtx(), + PipelineDataSource::kNormal, + cri, + BSONObj(), + BSONObj(), + boost::none /* mergeShardId */), + std::set{ShardId("0")}); +} + +TEST_F( + ShardedAggHelpersFixture, + Given_NormalDataSourceAndUntrackedCollection_When_CallingGetTargetedShards_Then_ReturnsDbPrimaryShard) { + std::set allShardIds = setupShards(3); + + expCtx()->setNamespaceString(kTestAggregateNss); + auto cri = makeUntrackedCollectionRoutingInfo(kTestAggregateNss); + ASSERT_EQ(getTargetedShards(expCtx(), + PipelineDataSource::kNormal, + cri, + BSONObj(), + BSONObj(), + boost::none /* mergeShardId */), + std::set{ShardId("0")}); +} + +TEST_F( + ShardedAggHelpersFixture, + Given_NormalDataSourceAndCriWithRoutingInfo_When_CallingGetTargetedShards_Then_ReturnsRelevantShards) { + std::set allShardIds = setupShards(3); + auto nss = kTestAggregateNss; + expCtx()->setNamespaceString(nss); + + loadRoutingTableWithTwoChunksAndTwoShards(nss); + std::set expectedShardIds{ShardId("0"), ShardId("1")}; + + auto catalogCache = Grid::get(getServiceContext())->catalogCache(); + const auto cri = + uassertStatusOK(catalogCache->getCollectionRoutingInfo(operationContext(), nss)); + ASSERT_EQ(getTargetedShards(expCtx(), + PipelineDataSource::kNormal, + cri, + BSONObj(), + BSONObj(), + boost::none /* mergeShardId */), + expectedShardIds); +} + +TEST_F( + ShardedAggHelpersFixture, + Given_NonChangeStreamPipeline_When_CallingOpenChangeStreamOnConfigsvr_Then_ReturnsNoRemoteCursor) { + ASSERT_EQ(openChangeStreamCursorOnConfigsvrIfNeeded( + expCtx(), PipelineDataSource::kGeneratesOwnDataOnce, Timestamp()), + boost::none); + ASSERT_EQ(openChangeStreamCursorOnConfigsvrIfNeeded( + expCtx(), PipelineDataSource::kNormal, Timestamp()), + boost::none); +} + +TEST_F( + ShardedAggHelpersFixture, + Given_ChangeStreamV2Pipeline_When_CallingOpenChangeStreamOnConfigsvr_Then_ReturnsNoRemoteCursor) { + setChangeStreamVersionToExpCtx(ChangeStreamReaderVersionEnum::kV2); + ASSERT_EQ(openChangeStreamCursorOnConfigsvrIfNeeded( + expCtx(), PipelineDataSource::kChangeStream, Timestamp()), + boost::none); +} + +TEST_F( + ShardedAggHelpersFixture, + Given_ChangeStreamPipelineOverConfigShardsNss_When_CallingOpenChangeStreamOnConfigsvr_Then_ReturnsNoRemoteCursor) { + expCtx()->setNamespaceString(NamespaceString::kConfigsvrShardsNamespace); + ASSERT_EQ(openChangeStreamCursorOnConfigsvrIfNeeded( + expCtx(), PipelineDataSource::kChangeStream, Timestamp()), + boost::none); +} + +TEST_F( + ShardedAggHelpersFixture, + Given_ChangeStreamPipelineOverRegularNss_When_CallingOpenChangeStreamOnConfigsvr_Then_ReturnsCursorOverConfigsvr) { + expCtx()->setNamespaceString(kTestAggregateNss); + + auto future = launchAsync([&] { + auto&& configsvrCursor = openChangeStreamCursorOnConfigsvrIfNeeded( + expCtx(), PipelineDataSource::kChangeStream, Timestamp()); + ASSERT_EQ(configsvrCursor->getCursorResponse().getCursorId(), CursorId(123)); + }); + + // Mock response for shard refresh. + expectGetShards({ShardType("1", "1")}); + + // Mock response from the agg request to the configsvr. + onCommand([this](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(NamespaceString::kConfigsvrShardsNamespace.coll(), + request.cmdObj.firstElement().valueStringData()); + + CursorResponse cursorResponse( + NamespaceString::kConfigsvrShardsNamespace, CursorId(123), {}); + return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + future.default_timed_get(); +} + +} // namespace +} // namespace mongo::sharded_agg_helpers diff --git a/src/mongo/s/change_streams/change_stream_db_absent_state_event_handler.cpp b/src/mongo/s/change_streams/change_stream_db_absent_state_event_handler.cpp index e08c20afe63..62233ab6bf9 100644 --- a/src/mongo/s/change_streams/change_stream_db_absent_state_event_handler.cpp +++ b/src/mongo/s/change_streams/change_stream_db_absent_state_event_handler.cpp @@ -30,7 +30,6 @@ #include "mongo/s/change_streams/change_stream_db_absent_state_event_handler.h" #include "mongo/bson/timestamp.h" -#include "mongo/s/change_streams/collection_change_stream_db_present_state_event_handler.h" #include "mongo/stdx/unordered_set.h" #include "mongo/util/assert_util.h" @@ -69,8 +68,7 @@ ShardTargeterDecision ChangeStreamShardTargeterDbAbsentStateEventHandler::handle readerCtx.openCursorsOnDataShards(clusterTime + 1, shardSet); // Since the database is now present, change the state event handler. - ctx.setEventHandler( - std::make_unique()); + ctx.setEventHandler(buildDbPresentStateEventHandler()); return ShardTargeterDecision::kContinue; } diff --git a/src/mongo/s/change_streams/change_stream_db_present_state_event_handler.cpp b/src/mongo/s/change_streams/change_stream_db_present_state_event_handler.cpp index 99e68a53cfb..e75711cb738 100644 --- a/src/mongo/s/change_streams/change_stream_db_present_state_event_handler.cpp +++ b/src/mongo/s/change_streams/change_stream_db_present_state_event_handler.cpp @@ -31,8 +31,6 @@ #include "mongo/util/assert_util.h" -#include - namespace mongo { namespace { void updateActiveShardCursors(Timestamp atClusterTime, @@ -40,22 +38,22 @@ void updateActiveShardCursors(Timestamp atClusterTime, ChangeStreamReaderContext& readerCtx) { const auto& currentActiveShardSet = readerCtx.getCurrentlyTargetedDataShards(); stdx::unordered_set shardsToCloseCursors; - std::set_difference(currentActiveShardSet.begin(), - currentActiveShardSet.end(), - newActiveShardSet.begin(), - newActiveShardSet.end(), - std::inserter(shardsToCloseCursors, shardsToCloseCursors.begin())); + for (const auto& currentActiveShard : currentActiveShardSet) { + if (!newActiveShardSet.contains(currentActiveShard)) { + shardsToCloseCursors.insert(currentActiveShard); + } + } if (!shardsToCloseCursors.empty()) { readerCtx.closeCursorsOnDataShards(shardsToCloseCursors); } stdx::unordered_set shardsToOpenCursors; - std::set_difference(newActiveShardSet.begin(), - newActiveShardSet.end(), - currentActiveShardSet.begin(), - currentActiveShardSet.end(), - std::inserter(shardsToOpenCursors, shardsToOpenCursors.begin())); + for (const auto& newActiveShard : newActiveShardSet) { + if (!currentActiveShardSet.contains(newActiveShard)) { + shardsToOpenCursors.insert(newActiveShard); + } + } if (!shardsToOpenCursors.empty()) { readerCtx.openCursorsOnDataShards(atClusterTime, shardsToOpenCursors); diff --git a/src/mongo/s/change_streams/collection_change_stream_db_present_state_event_handler_test.cpp b/src/mongo/s/change_streams/collection_change_stream_db_present_state_event_handler_test.cpp index 7face99c2db..a7ecf6dc228 100644 --- a/src/mongo/s/change_streams/collection_change_stream_db_present_state_event_handler_test.cpp +++ b/src/mongo/s/change_streams/collection_change_stream_db_present_state_event_handler_test.cpp @@ -214,14 +214,15 @@ TEST_F( CollectionDbPresentStateEventHandlerFixture, Given_NamespacePlacementChangedControlEventWithShards_When_HandleEventIsCalled_Then_CursorsAreUpdated) { Timestamp clusterTime(60, 10); - Timestamp committedAt(60, 10); - NamespacePlacementChangedControlEvent event{clusterTime, committedAt, makeTestNss()}; + NamespacePlacementChangedControlEvent event{clusterTime, makeTestNss()}; ShardId shardA("shardA"); ShardId shardB("shardB"); - readerCtx().currentlyTargetedShards = {shardA}; + ShardId shardC("shardC"); + ShardId shardD("shardD"); + readerCtx().currentlyTargetedShards = {shardA, shardD}; - std::vector shards = {shardB}; + std::vector shards = {shardA, shardB, shardC}; stdx::unordered_set shardSet(shards.begin(), shards.end()); std::vector responses{ {clusterTime, HistoricalPlacement(shards, HistoricalPlacementStatus::OK)}}; @@ -234,11 +235,11 @@ TEST_F( ASSERT_EQ(readerCtx().openCursorsOnDataShardsCalls.size(), 1); ASSERT_EQ(readerCtx().openCursorsOnDataShardsCalls[0].atClusterTime, clusterTime + 1); ASSERT_EQ(readerCtx().openCursorsOnDataShardsCalls[0].shardSet, - stdx::unordered_set{shardB}); + (stdx::unordered_set{shardB, shardC})); ASSERT_EQ(readerCtx().closeCursorsOnDataShardsCalls.size(), 1); ASSERT_EQ(readerCtx().closeCursorsOnDataShardsCalls[0].shardSet, - stdx::unordered_set{shardA}); + (stdx::unordered_set{shardD})); } TEST_F( diff --git a/src/mongo/s/change_streams/collection_change_stream_shard_targeter_impl_test.cpp b/src/mongo/s/change_streams/collection_change_stream_shard_targeter_impl_test.cpp index 2c9d912b644..0f94a93ebf5 100644 --- a/src/mongo/s/change_streams/collection_change_stream_shard_targeter_impl_test.cpp +++ b/src/mongo/s/change_streams/collection_change_stream_shard_targeter_impl_test.cpp @@ -159,10 +159,11 @@ TEST_F(CollectionChangeStreamShardTargeterImplFixture, auto& eventHandlerMock = dynamic_cast(*targeter().getEventHandler()); - Document moveChunkEvent = Document(BSON( - "operationType" << MoveChunkControlEvent::opType << "clusterTime" << Timestamp() << "donor" - << "shardA" << "recipient" << "shardB" - << "allCollectionChunksMigratedFromDonor" << true)); + Document moveChunkEvent = Document( + BSON("operationType" << MoveChunkControlEvent::opType << "clusterTime" << Timestamp() + << "operationDescription" + << BSON("donor" << "shardA" << "recipient" << "shardB" + << "allCollectionChunksMigratedFromDonor" << true))); auto controlEvent = parseControlEvent(moveChunkEvent); targeter().handleEvent(opCtx(), moveChunkEvent, readerCtx()); ASSERT_EQ(eventHandlerMock.calls.size(), 1); diff --git a/src/mongo/s/change_streams/control_events.cpp b/src/mongo/s/change_streams/control_events.cpp index 4a0c9390bf2..cfe8db6dbe1 100644 --- a/src/mongo/s/change_streams/control_events.cpp +++ b/src/mongo/s/change_streams/control_events.cpp @@ -48,6 +48,7 @@ static constexpr StringData kAllCollectionChunksMigratedFromDonorField = "allCollectionChunksMigratedFromDonor"_sd; static constexpr StringData kFromField = "from"_sd; static constexpr StringData kToField = "to"_sd; +static constexpr StringData kOperationDescriptionField = "operationDescription"_sd; Value assertFieldType(const Document& document, StringData fieldName, BSONType expectedType) { auto val = document[fieldName]; @@ -62,22 +63,27 @@ Value assertFieldType(const Document& document, StringData fieldName, BSONType e } // namespace MoveChunkControlEvent MoveChunkControlEvent::createFromDocument(const Document& event) { + auto opDescription = + assertFieldType(event, kOperationDescriptionField, BSONType::object).getDocument(); Timestamp clusterTime = assertFieldType(event, kClusterTimeField, BSONType::timestamp).getTimestamp(); - ShardId fromShard = assertFieldType(event, kDonorField, BSONType::string).getString(); - ShardId toShard = assertFieldType(event, kRecipientField, BSONType::string).getString(); + ShardId fromShard = assertFieldType(opDescription, kDonorField, BSONType::string).getString(); + ShardId toShard = assertFieldType(opDescription, kRecipientField, BSONType::string).getString(); bool allCollectionChunksMigratedFromDonor = - assertFieldType(event, kAllCollectionChunksMigratedFromDonorField, BSONType::boolean) + assertFieldType( + opDescription, kAllCollectionChunksMigratedFromDonorField, BSONType::boolean) .getBool(); return MoveChunkControlEvent{ clusterTime, fromShard, toShard, allCollectionChunksMigratedFromDonor}; } MovePrimaryControlEvent MovePrimaryControlEvent::createFromDocument(const Document& event) { + auto opDescription = + assertFieldType(event, kOperationDescriptionField, BSONType::object).getDocument(); Timestamp clusterTime = assertFieldType(event, kClusterTimeField, BSONType::timestamp).getTimestamp(); - ShardId fromShard = assertFieldType(event, kFromField, BSONType::string).getString(); - ShardId toShard = assertFieldType(event, kToField, BSONType::string).getString(); + ShardId fromShard = assertFieldType(opDescription, kFromField, BSONType::string).getString(); + ShardId toShard = assertFieldType(opDescription, kToField, BSONType::string).getString(); return MovePrimaryControlEvent{clusterTime, fromShard, toShard}; } @@ -85,13 +91,11 @@ NamespacePlacementChangedControlEvent NamespacePlacementChangedControlEvent::cre const Document& event) { Timestamp clusterTime = assertFieldType(event, kClusterTimeField, BSONType::timestamp).getTimestamp(); - Timestamp committedAt = - assertFieldType(event, kCommittedAtField, BSONType::timestamp).getTimestamp(); auto nsField = assertFieldType(event, kNamespaceField, BSONType::object).getDocument(); auto nssSpec = NamespaceSpec::parse(nsField.toBson(), IDLParserContext("NamespacePlacementChangedControlEvent")); NamespaceString nss = NamespaceStringUtil::deserialize(*nssSpec.getDb(), *nssSpec.getColl()); - return NamespacePlacementChangedControlEvent{clusterTime, committedAt, nss}; + return NamespacePlacementChangedControlEvent{clusterTime, nss}; } DatabaseCreatedControlEvent DatabaseCreatedControlEvent::createFromDocument(const Document& event) { diff --git a/src/mongo/s/change_streams/control_events.h b/src/mongo/s/change_streams/control_events.h index 147441f1551..6059a5eb1ec 100644 --- a/src/mongo/s/change_streams/control_events.h +++ b/src/mongo/s/change_streams/control_events.h @@ -82,7 +82,6 @@ struct NamespacePlacementChangedControlEvent { bool operator==(const NamespacePlacementChangedControlEvent& other) const = default; Timestamp clusterTime; - Timestamp committedAt; NamespaceString nss; }; diff --git a/src/mongo/s/change_streams/control_events_test.cpp b/src/mongo/s/change_streams/control_events_test.cpp index 02bb84060c1..52ffdc8d6c2 100644 --- a/src/mongo/s/change_streams/control_events_test.cpp +++ b/src/mongo/s/change_streams/control_events_test.cpp @@ -49,10 +49,11 @@ TEST( Timestamp ts; ShardId donorShard("fromShard"); ShardId recipientShard("toShard"); - Document event = - Document(BSON("operationType" << MoveChunkControlEvent::opType << "clusterTime" << ts - << "donor" << donorShard << "recipient" << recipientShard - << "allCollectionChunksMigratedFromDonor" << true)); + Document event = Document( + BSON("operationType" << MoveChunkControlEvent::opType << "clusterTime" << ts + << "operationDescription" + << BSON("donor" << donorShard << "recipient" << recipientShard + << "allCollectionChunksMigratedFromDonor" << true))); ControlEvent expectedControlEvent = MoveChunkControlEvent{ts, donorShard, recipientShard, true}; ASSERT_EQ(parseControlEvent(event), expectedControlEvent); @@ -73,7 +74,8 @@ TEST( ShardId recipientShard("toShard"); Document event = Document(BSON("operationType" << MovePrimaryControlEvent::opType << "clusterTime" << ts - << "from" << donorShard << "to" << recipientShard)); + << "operationDescription" + << BSON("from" << donorShard << "to" << recipientShard))); ControlEvent expectedControlEvent = MovePrimaryControlEvent{ts, donorShard, recipientShard}; ASSERT_EQ(parseControlEvent(event), expectedControlEvent); @@ -90,7 +92,6 @@ TEST( ControlEventTest, GivenValidNamespacePlacementChangedControlEventAsDocument_WhenCallingParseControlEvent_ThenParsingIsSuccessful) { Timestamp ts; - Timestamp committedAt; auto nss = NamespaceString::kDefaultInitialSyncIdNamespace; auto nssSpec = [&]() { @@ -100,11 +101,11 @@ TEST( return nssSpec; }(); - Document event = Document(BSON("operationType" << NamespacePlacementChangedControlEvent::opType - << "clusterTime" << ts << "committedAt" - << committedAt << "ns" << nssSpec.toBSON())); + Document event = + Document(BSON("operationType" << NamespacePlacementChangedControlEvent::opType + << "clusterTime" << ts << "ns" << nssSpec.toBSON())); - ControlEvent expectedControlEvent = NamespacePlacementChangedControlEvent{ts, committedAt, nss}; + ControlEvent expectedControlEvent = NamespacePlacementChangedControlEvent{ts, nss}; ASSERT_EQ(parseControlEvent(event), expectedControlEvent); } diff --git a/src/mongo/s/change_streams/historical_placement_fetcher_impl.cpp b/src/mongo/s/change_streams/historical_placement_fetcher_impl.cpp index 3c9a5fa5097..71f80cc5956 100644 --- a/src/mongo/s/change_streams/historical_placement_fetcher_impl.cpp +++ b/src/mongo/s/change_streams/historical_placement_fetcher_impl.cpp @@ -45,7 +45,7 @@ HistoricalPlacement HistoricalPlacementFetcherImpl::fetch( // string. const auto targetWholeCluster = !nss.has_value() || nss->isEmpty(); ConfigsvrGetHistoricalPlacement request( - targetWholeCluster ? nss.value() : NamespaceString::kEmpty, atClusterTime); + targetWholeCluster ? NamespaceString::kEmpty : nss.value(), atClusterTime); request.setTargetWholeCluster(targetWholeCluster); request.setCheckIfPointInTimeIsInFuture(checkIfPointInTimeIsInFuture); diff --git a/src/mongo/s/query/exec/router_stage_pipeline.cpp b/src/mongo/s/query/exec/router_stage_pipeline.cpp index f5d742e9864..57de1941e01 100644 --- a/src/mongo/s/query/exec/router_stage_pipeline.cpp +++ b/src/mongo/s/query/exec/router_stage_pipeline.cpp @@ -126,6 +126,12 @@ BSONObj RouterStagePipeline::_validateAndConvertToBSON(const Document& event) { } bool RouterStagePipeline::remotesExhausted() const { + // Change stream pipelines can never be exhausted. Instead invalidation event may be sent, + // closing the stream. + if (_mergePipeline->getContext()->isTailableAwaitData()) { + return false; + } + return !_mergeCursorsStage || _mergeCursorsStage->remotesExhausted(); } diff --git a/src/mongo/s/query/planner/cluster_aggregation_planner.cpp b/src/mongo/s/query/planner/cluster_aggregation_planner.cpp index 893f0f2693b..37739967867 100644 --- a/src/mongo/s/query/planner/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/planner/cluster_aggregation_planner.cpp @@ -844,6 +844,7 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, bool eligibleForSampling, bool requestQueryStatsFromRemotes) { auto expCtx = targeter.pipeline->getContext(); + const bool isChangeStreamV2Pipeline = expCtx->isChangeStreamV2(); // If not, split the pipeline as necessary and dispatch to the relevant shards. auto shardDispatchResults = @@ -859,8 +860,9 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, // Check for valid usage of SEARCH_META. We wait until after we've dispatched pipelines to the // shards in the event that we need to resolve any views. // TODO PM-1966: We can resume doing this at parse time once views are tracked in the catalog. + // Change stream v2 does not have a shard pipeline. auto svcCtx = opCtx->getServiceContext(); - if (svcCtx) { + if (svcCtx && !isChangeStreamV2Pipeline) { if (shardDispatchResults.pipelineForSingleShard) { search_helpers::assertSearchMetaAccessValid( shardDispatchResults.pipelineForSingleShard->getSources(), expCtx.get()); @@ -882,9 +884,39 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, std::move(shardDispatchResults), expCtx, result); } - // If this isn't an explain, then we must have established cursors on at least one - // shard. - invariant(shardDispatchResults.remoteCursors.size() > 0); + // Change stream v2 pipeline does not target any shards, yet needs to have a $mergeCursors stage + // attached to its merge pipeline running on mongos. + if (isChangeStreamV2Pipeline) { + tassert(10744203, + "change stream v2 should not target any shards", + shardDispatchResults.remoteCursors.empty()); + + auto mongosPipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline); + tassert(10744204, + "tried to dispatch merge pipeline but there was no merge portion of the split " + "pipeline", + mongosPipeline); + + // Add $mergeCursors stage to the merge pipeline with an empty set of remote shards. Shard + // cursors will be later added by the ChangeStreamHandleTopologyChangeV2 stage. + sharded_agg_helpers::partitionAndAddMergeCursorsSource( + mongosPipeline.get(), + {} /* cursors */, + shardDispatchResults.splitPipeline->shardCursorsSortSpec, + requestQueryStatsFromRemotes); + return runPipelineOnMongoS(namespaces, + batchSize, + std::move(mongosPipeline), + result, + privileges, + requestQueryStatsFromRemotes); + } + + // If this isn't an explain or change stream v2, then we must have established cursors on at + // least one shard. + tassert(10744205, + "aggregate must have established cursors on at least one shard", + shardDispatchResults.remoteCursors.size() > 0); // If we sent the entire pipeline to a single shard, store the remote cursor and return. if (!shardDispatchResults.splitPipeline) {