SERVER-107442 Integrate system elements and smoke test change stream on a collection in Strict mode (#42089)

GitOrigin-RevId: 7ec89c35412e6ef703e5c549ec5ea6b4b8eb0ae1
This commit is contained in:
Denis Grebennicov 2025-10-09 14:22:13 +02:00 committed by MongoDB Bot
parent ac1aac1b63
commit fac2bb5f89
24 changed files with 901 additions and 155 deletions

View File

@ -12,8 +12,7 @@ import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
const dbName = jsTestName();
const collName = jsTestName();
// TODO SERVER-107442: reenable testing of "v2" reader version here.
const validVersions = ["v1", undefined];
const validVersions = ["v1", "v2", undefined];
const isPreciseShardTargetingEnabled = FeatureFlagUtil.isEnabled(db, "ChangeStreamPreciseShardTargeting");
function testChangeStreamWithVersionAttributeSet(version = undefined) {
@ -34,7 +33,7 @@ function testChangeStreamWithVersionAttributeSet(version = undefined) {
// Database-level and all database-change streams are currently not implemented for v2 readers, so we
// only add them to the test when it is safe to do so (non-v2 change stream reader and/or feature flag is
// disabled).
if (version !== "v2" || !isPreciseShardTargetingEnabled) {
if (version !== "v2" && !isPreciseShardTargetingEnabled) {
tests = tests.concat([
{collection: 1}, // Whole-DB change stream
{}, // Whole-cluster change stream

View File

@ -0,0 +1,264 @@
/**
* Smoke tests for $changeStream v2 in a sharded cluster.
* Tested scenarios include:
* - Opening a change stream on an existing collection, capturing events before, during, and after resharding, until invalidation
* - Opening a change stream on a non-existent collection, waiting for the collection to be created before returning events
* - Opening a change stream in the future, waiting for the start time to be reached before returning events
*
* @tags: [
* featureFlagChangeStreamPreciseShardTargeting,
* ]
*/
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {describe, it, before, afterEach, after, beforeEach} from "jstests/libs/mochalite.js";
import {assertCreateCollection, assertDropCollection} from "jstests/libs/collection_drop_recreate.js";
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
describe("$changeStream v2", function () {
let st;
let db;
let coll;
let csTest;
before(function () {
st = new ShardingTest({
shards: 3,
mongos: 1,
rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}},
other: {
enableBalancer: false,
},
});
db = st.s.getDB(jsTestName());
coll = db.test;
//Enable sharding on the the test database and ensure that the primary is shard0.
assert.commandWorked(db.adminCommand({enableSharding: db.getName(), primaryShard: st.shard0.shardName}));
});
afterEach(function () {
csTest.cleanUp();
assertDropCollection(db, coll.getName());
});
after(function () {
st.stop();
});
function assertCollDataDistribution(expectedCounts) {
for (const [shardConn, expectedCount] of expectedCounts) {
assert.soon(
() => {
const docs = shardConn.getDB(db.getName())[coll.getName()].find().toArray();
return expectedCount == docs.length;
},
"Expected " + expectedCount + " documents on " + shardConn,
);
}
}
function distributeCollDataOverShards(coll, distributionConfig) {
assert.commandWorked(
st.s.adminCommand({
split: coll.getFullName(),
middle: distributionConfig.middle,
}),
);
for (const chunkConfig of distributionConfig.chunks) {
assert.commandWorked(
st.s.adminCommand({
moveChunk: coll.getFullName(),
find: chunkConfig.find,
to: chunkConfig.to,
_waitForDelete: true,
}),
);
}
assertCollDataDistribution(distributionConfig.expectedCounts);
}
it("returns events before and after resharding until invalidation", function () {
// Create and shard a collection and allocate collection to shard set {shard0, shard1}.
assertCreateCollection(db, coll.getName());
assert.commandWorked(db.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}}));
coll.insertMany([
{_id: -1, a: -1},
{_id: 1, a: 1},
]);
distributeCollDataOverShards(coll, {
middle: {_id: 0},
chunks: [
{find: {_id: -1}, to: st.shard0.shardName},
{find: {_id: 1}, to: st.shard1.shardName},
],
expectedCounts: [
[st.shard0, 1],
[st.shard1, 1],
[st.shard2, 0],
],
});
// Open a change stream.
csTest = new ChangeStreamTest(db);
const csCursor = csTest.startWatchingChanges({pipeline: [{$changeStream: {version: "v2"}}], collection: coll});
// Insert documents into the collection and ensure data distribution.
coll.insertMany([
{_id: 2, a: 2},
{_id: 3, a: 3},
]);
assertCollDataDistribution([
[st.shard0, 1],
[st.shard1, 3],
[st.shard2, 0],
]);
// Reshard collection and allocate to shards {shard1, shard2}.
assert.commandWorked(
st.s.adminCommand({reshardCollection: coll.getFullName(), key: {a: 1}, numInitialChunks: 1}),
);
distributeCollDataOverShards(coll, {
middle: {a: 2},
chunks: [
{find: {a: 1}, to: st.shard1.shardName},
{find: {a: 2}, to: st.shard2.shardName},
],
expectedCounts: [
[st.shard0, 0],
[st.shard1, 2],
[st.shard2, 2],
],
});
// Insert documents into the collection and ensure data distribution.
coll.insertMany([
{_id: 4, a: 4},
{_id: 5, a: 5},
]);
assertCollDataDistribution([
[st.shard0, 0],
[st.shard1, 2],
[st.shard2, 4],
]);
// Drop the collection.
assertDropCollection(db, coll.getName());
// Read events until invalidation.
csTest.assertNextChangesEqual({
cursor: csCursor,
expectedChanges: [
{
documentKey: {_id: 2},
fullDocument: {_id: 2, a: 2},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
},
{
documentKey: {_id: 3},
fullDocument: {_id: 3, a: 3},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
},
{
documentKey: {_id: 4, a: 4},
fullDocument: {_id: 4, a: 4},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
},
{
documentKey: {_id: 5, a: 5},
fullDocument: {_id: 5, a: 5},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
},
{
ns: {db: db.getName(), coll: coll.getName()},
operationType: "drop",
},
{
operationType: "invalidate",
},
],
});
});
it("can be opened on non-existing collection and returns events once it is created", function () {
// Open the change stream just after the invalidation cluster time.
csTest = new ChangeStreamTest(db);
const csCursor = csTest.startWatchingChanges({
pipeline: [{$changeStream: {version: "v2", showExpandedEvents: true}}],
collection: coll,
});
// Create an unsplittable collection.
assert.commandWorked(db.runCommand({createUnsplittableCollection: "test", dataShard: st.shard1.shardName}));
// Insert some documents.
coll.insertMany([
{_id: 1, a: 1},
{_id: 2, a: 2},
]);
// Read events.
csTest.assertNextChangesEqual({
cursor: csCursor,
expectedChanges: [
{
ns: {db: db.getName(), coll: coll.getName()},
operationType: "create",
},
{
documentKey: {_id: 1},
fullDocument: {_id: 1, a: 1},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
},
{
documentKey: {_id: 2},
fullDocument: {_id: 2, a: 2},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
},
],
});
});
it("returns events after start time when opened in the future", function () {
// Create collection.
assertCreateCollection(db, coll.getName());
// Open a change stream on 'coll' 3 seconds in the future.
const testStartTime = db.adminCommand({hello: 1}).$clusterTime.clusterTime;
testStartTime.t += 3;
csTest = new ChangeStreamTest(db);
let csCursor = csTest.startWatchingChanges({
pipeline: [{$changeStream: {version: "v2", startAtOperationTime: testStartTime}}],
collection: coll,
});
// Expect the cursor to not return any events, yet to not be closed.
csCursor = csTest.assertNoChange(csCursor);
assert.neq(csCursor.id, 0, "cursor was closed unexpectedly");
// Wait until we are past the start time.
sleep(5000);
// Insert a document in collection.
assert.commandWorked(coll.insert({_id: 1, a: 1}));
// Read event.
csTest.assertNextChangesEqual({
cursor: csCursor,
expectedChanges: [
{
documentKey: {_id: 1},
fullDocument: {_id: 1, a: 1},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
},
],
});
});
});

View File

@ -192,7 +192,9 @@ public:
void closeCursorsOnDataShards(const stdx::unordered_set<ShardId>& shardIds) override {
closeCursors(shardIds, false /* isConfigServer */);
_currentlyTargetedDataShards.erase(shardIds.begin(), shardIds.end());
for (const auto& shardId : shardIds) {
_currentlyTargetedDataShards.erase(shardId);
}
}
void closeCursorOnConfigServer(OperationContext* opCtx) override {

View File

@ -1590,6 +1590,7 @@ mongo_cc_unit_test(
"sequential_document_cache_test.cpp",
"serverless_aggregation_context_fixture.cpp",
"serverless_aggregation_context_fixture.h",
"sharded_agg_helpers_test.cpp",
"sharded_union_test.cpp",
"skip_and_limit_test.cpp",
"tee_buffer_test.cpp",

View File

@ -51,6 +51,7 @@
#include "mongo/util/namespace_string_util.h"
#include "mongo/util/str.h"
#include <algorithm>
#include <array>
#include <cstddef>
#include <initializer_list>
@ -315,6 +316,8 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// can be overriden for specific event types below.
bool requireUUID = true;
bool shouldAddOperationDescriptionField = _changeStreamSpec.getShowExpandedEvents();
MutableDocument doc;
switch (opType) {
@ -503,10 +506,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// Check for dynamic events that were specified via the 'supportedEvents' change stream
// parameter.
// This also checks for some hard-coded sharding-related events.
if (auto result = handleSupportedEvent(o2Field)) {
if (const auto& result = handleSupportedEvent(o2Field)) {
// Apply returned event name and operationDescription.
operationType = result->first;
operationDescription = result->second;
operationType = result->opType;
operationDescription = result->opDescription;
shouldAddOperationDescriptionField |= !result->isBuiltInEvent;
// Check if the 'reshardingUUID' field needs to be added to the event.
if (kOpsWithReshardingUUIDs.contains(operationType)) {
@ -600,7 +604,10 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// The event may have a documentKey OR an operationDescription, but not both. We already
// validated this while creating the resume token.
doc.addField(DocumentSourceChangeStream::kDocumentKeyField, std::move(documentKey));
if (_changeStreamSpec.getShowExpandedEvents() && !operationDescription.missing()) {
// Control events must be emitted with the corresponding 'operationDescription' field,
// regardless of change stream being opened in 'showExpandedEvents' mode or not.
if (shouldAddOperationDescriptionField && !operationDescription.missing()) {
doc.addField(DocumentSourceChangeStream::kOperationDescriptionField,
std::move(operationDescription));
}
@ -626,13 +633,18 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
return doc.freeze();
}
boost::optional<std::pair<StringData, Value>>
boost::optional<ChangeStreamDefaultEventTransformation::SupportedEventResult>
ChangeStreamDefaultEventTransformation::handleSupportedEvent(const Document& o2Field) const {
for (auto&& supportedEvent : _supportedEvents) {
if (auto lookup = o2Field[supportedEvent]; !lookup.missing()) {
// Known event.
return std::make_pair(supportedEvent,
Value{copyDocExceptFields(o2Field, {supportedEvent})});
const bool isBuiltInEvent =
std::find(kBuiltInNoopEvents.begin(), kBuiltInNoopEvents.end(), supportedEvent) !=
kBuiltInNoopEvents.end();
return ChangeStreamDefaultEventTransformation::SupportedEventResult{
supportedEvent,
Value{copyDocExceptFields(o2Field, {supportedEvent})},
isBuiltInEvent};
}
}
return boost::none;

View File

@ -90,6 +90,12 @@ protected:
* The event builder class to be used for oplog entries with no special behavior.
*/
class ChangeStreamDefaultEventTransformation final : public ChangeStreamEventTransformation {
struct SupportedEventResult {
StringData opType;
Value opDescription;
bool isBuiltInEvent;
};
public:
ChangeStreamDefaultEventTransformation(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec);
@ -104,11 +110,11 @@ public:
private:
/**
* Checks the 'o2Field' value of an oplog entry has any field name that is contained in
* '_supportedEvents'. If so, it returns the name of the field and the value mapped to the field
* in the oplog entry. Otherwise returns 'boost::none'.
* '_supportedEvents'. If so, it returns the name of the field, the value mapped to the field
* in the oplog entry, as well as whether the event is a builtin event. Otherwise returns
* 'boost::none'.
*/
boost::optional<std::pair<StringData, Value>> handleSupportedEvent(
const Document& o2Field) const;
boost::optional<SupportedEventResult> handleSupportedEvent(const Document& o2Field) const;
/**
* Build the '_supportedEvents' container from the 'supportedEvents' change stream parameter.

View File

@ -62,10 +62,32 @@ namespace mongo {
namespace {
using namespace change_stream_test_helper;
Document applyTransformation(const repl::OplogEntry& oplogEntry, NamespaceString ns = nss) {
repl::MutableOplogEntry buildMovePrimaryOplogEntry(OperationContext* opCtx,
const DatabaseName& dbName,
const ShardId& oldPrimary,
const ShardId& newPrimary) {
repl::MutableOplogEntry oplogEntry;
const auto dbNameStr =
DatabaseNameUtil::serialize(dbName, SerializationContext::stateDefault());
oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
oplogEntry.setNss(NamespaceString(dbName));
oplogEntry.setObject(BSON("msg" << BSON("movePrimary" << dbNameStr)));
oplogEntry.setObject2(
BSON("movePrimary" << dbNameStr << "from" << oldPrimary << "to" << newPrimary));
oplogEntry.setOpTime(repl::OpTime(kDefaultTs, 0));
oplogEntry.setWallClockTime(Date_t());
return oplogEntry;
}
Document applyTransformation(const repl::OplogEntry& oplogEntry,
NamespaceString ns = nss,
const std::vector<std::string>& supportedEvents = {}) {
const auto oplogDoc = Document(oplogEntry.getEntry().toBSON());
DocumentSourceChangeStreamSpec spec;
spec.setStartAtOperationTime(kDefaultTs);
spec.setSupportedEvents(supportedEvents);
spec.setShowExpandedEvents(true);
ChangeStreamEventTransformer transformer(make_intrusive<ExpressionContextForTest>(ns), spec);
@ -283,5 +305,68 @@ TEST(ChangeStreamEventTransformTest, TestCreateViewOnSingleCollection) {
ASSERT_DOCUMENT_EQ(applyTransformation(oplogEntry), expectedDoc);
}
TEST(ChangeStreamEventTransformTest,
Given_NoopOplogEntry_When_CallingTransform_Then_FieldsAreNotCopied) {
const NamespaceString nss =
NamespaceString::createNamespaceString_forTest(boost::none, "testDB.coll.name");
// Create a noop oplog entry that represents a 'shardCollection' event.
repl::MutableOplogEntry oplogEntry;
oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
oplogEntry.setNss(nss);
oplogEntry.setObject(BSON("msg" << BSON("shardCollection" << nss.toString_forTest())));
oplogEntry.setObject2(BSON("shardCollection" << nss.toString_forTest() << "key"
<< BSON("x" << 1) << "unique" << false));
oplogEntry.setOpTime(repl::OpTime(kDefaultTs, 0));
oplogEntry.setWallClockTime(Date_t());
// Expect fields from the oplog entry to be present in the 'operationDescription' field.
Document expectedDoc{
{DocumentSourceChangeStream::kIdField,
makeResumeToken(kDefaultTs,
Value(),
Document{{"key", Document{{"x", 1}}}, {"unique", false}},
DocumentSourceChangeStream::kShardCollectionOpType)},
{DocumentSourceChangeStream::kOperationTypeField,
DocumentSourceChangeStream::kShardCollectionOpType},
{DocumentSourceChangeStream::kClusterTimeField, kDefaultTs},
{DocumentSourceChangeStream::kWallTimeField, Date_t()},
{DocumentSourceChangeStream::kNamespaceField,
Document{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
{DocumentSourceChangeStream::kOperationDescriptionField,
Document{{"key", Document{{"x", 1}}}, {"unique", false}}},
};
repl::OplogEntry immutableEntry(oplogEntry.toBSON());
ASSERT_DOCUMENT_EQ(applyTransformation(immutableEntry, nss), expectedDoc);
}
TEST(
ChangeStreamEventTransformTest,
Given_NoopOplogEntryWhichIsNotBuiltIn_When_CallingTransform_Then_OperationDescriptionIsPresent) {
const NamespaceString nss =
NamespaceString::createNamespaceString_forTest(boost::none, "testDB.coll.name");
auto serviceContext = std::make_unique<QueryTestServiceContext>();
auto opCtx = serviceContext->makeOperationContext();
auto oplogEntry = buildMovePrimaryOplogEntry(
opCtx.get(), nss.dbName(), ShardId("oldPrimary"), ShardId("newPrimary"));
auto opDescription = Document{{
{"from"_sd, "oldPrimary"_sd},
{"to"_sd, "newPrimary"_sd},
}};
Document expectedDoc{
{DocumentSourceChangeStream::kIdField,
makeResumeToken(kDefaultTs, Value(), opDescription, "movePrimary")},
{DocumentSourceChangeStream::kOperationTypeField, "movePrimary"_sd},
{DocumentSourceChangeStream::kClusterTimeField, kDefaultTs},
{DocumentSourceChangeStream::kWallTimeField, Date_t()},
{DocumentSourceChangeStream::kNamespaceField, Document{{"db", nss.db_forTest()}}},
{DocumentSourceChangeStream::kOperationDescriptionField, opDescription}};
repl::OplogEntry immutableEntry(oplogEntry.toBSON());
ASSERT_DOCUMENT_EQ(applyTransformation(immutableEntry, nss, {"movePrimary"}), expectedDoc);
}
} // namespace
} // namespace mongo

View File

@ -441,7 +441,10 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter(
// Noop change events that are only applicable when merging results on router:
// - migrateChunkToNewShard: A chunk migrated to a shard that didn't have any chunks.
if (expCtx->getInRouter() || expCtx->getNeedsMerge()) {
// Do not emit 'migrateChunkToNewShard' event for change streams version 2, as it is not needed
// for handling topology changes.
// TODO: SERVER-111727 Stop emitting migrateChunkToNewShard change event.
if (!expCtx->isChangeStreamV2() && (expCtx->getInRouter() || expCtx->getNeedsMerge())) {
internalOpTypes.push_back("migrateChunkToNewShard"_sd);
}

View File

@ -384,7 +384,7 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
// We can only run on a replica set, or through mongoS. Confirm that this is the case.
auto replCoord = repl::ReplicationCoordinator::get(expCtx->getOperationContext());
uassert(40573,
"The $changeStream stage is only supported on replica sets",
"The $changeStream stage is only supported on replica sets or mongos",
expCtx->getInRouter() || (replCoord && replCoord->getSettings().isReplSet()));
// We will not validate user specified options when we are not expecting to execute queries,

View File

@ -3312,7 +3312,8 @@ TEST_F(ChangeStreamStageTest, DocumentSourceChangeStreamTransformTransformSingle
{DSChangeStream::kOperationTypeField, "eventType1"_sd},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}};
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
{DSChangeStream::kOperationDescriptionField, Document{operationDescription}}};
auto stage =
exec::agg::MockStage::createForTest({Document{entry.getEntry().toBSON()}}, getExpCtx());
@ -3371,7 +3372,8 @@ TEST_F(ChangeStreamStageTest, DocumentSourceChangeStreamTransformTransformMultip
{DSChangeStream::kOperationTypeField, "eventType1"_sd},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}};
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
{DSChangeStream::kOperationDescriptionField, Document{operationDescriptionEvent1}}};
Document expectedDoc2{
{DSChangeStream::kIdField,
@ -3379,7 +3381,8 @@ TEST_F(ChangeStreamStageTest, DocumentSourceChangeStreamTransformTransformMultip
{DSChangeStream::kOperationTypeField, "eventType2"_sd},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}};
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
{DSChangeStream::kOperationDescriptionField, Document{operationDescriptionEvent2}}};
std::deque<exec::agg::GetNextResult> docs;
docs.push_back(Document{entry1.getEntry().toBSON()});

View File

@ -800,6 +800,14 @@ public:
_params.changeStreamSpec = std::move(changeStreamSpec);
}
bool isChangeStreamV2() const {
if (const auto& spec = _params.changeStreamSpec) {
return spec->getVersion() == ChangeStreamReaderVersionEnum::kV2;
}
return false;
}
const BSONObj& getOriginalAggregateCommand() const {
return _params.originalAggregateCommand;
}

View File

@ -168,38 +168,6 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v
return explainCommandBuilder.freeze();
}
/**
* Open a $changeStream cursor on the 'config.shards' collection to watch for new shards.
*/
RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Timestamp startMonitoringAtTime) {
const auto& configShard =
Grid::get(expCtx->getOperationContext())->shardRegistry()->getConfigShard();
// Pipeline: {$changeStream: {startAtOperationTime: [now], allowToRunOnConfigDB: true}}
AggregateCommandRequest aggReq(
NamespaceString::kConfigsvrShardsNamespace,
{BSON(DocumentSourceChangeStream::kStageName
<< BSON(DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName
<< startMonitoringAtTime
<< DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))});
aggregation_request_helper::setFromRouter(
VersionContext::getDecoration(expCtx->getOperationContext()), aggReq, true);
aggReq.setNeedsMerge(true);
SimpleCursorOptions cursor;
cursor.setBatchSize(0);
aggReq.setCursor(cursor);
setReadWriteConcern(expCtx->getOperationContext(), aggReq, true, !expCtx->getExplain());
auto configCursor = establishCursors(expCtx->getOperationContext(),
expCtx->getMongoProcessInterface()->taskExecutor,
aggReq.getNamespace(),
ReadPreferenceSetting{ReadPreference::SecondaryPreferred},
{{configShard->getId(), aggReq.toBSON()}},
false);
invariant(configCursor.size() == 1);
return std::move(*configCursor.begin());
}
BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<ExplainOptions::Verbosity> explainVerbosity,
@ -330,38 +298,6 @@ std::vector<RemoteCursor> establishShardCursors(
}
}
std::set<ShardId> getTargetedShards(boost::intrusive_ptr<ExpressionContext> expCtx,
PipelineDataSource pipelineDataSource,
bool mustRunOnAllShards,
const boost::optional<CollectionRoutingInfo>& cri,
const BSONObj shardQuery,
const BSONObj collation,
const boost::optional<ShardId>& mergeShardId) {
if (mustRunOnAllShards) {
// The pipeline begins with a stage which must be run on all shards.
auto shardIds = Grid::get(expCtx->getOperationContext())
->shardRegistry()
->getAllShardIds(expCtx->getOperationContext());
return {std::make_move_iterator(shardIds.begin()), std::make_move_iterator(shardIds.end())};
} else if (pipelineDataSource == PipelineDataSource::kGeneratesOwnDataOnce) {
if (mergeShardId) {
return {*mergeShardId};
}
// The output collection is nonexistent or sharded, so we cannot determine a single
// mergeShardId. Designate the dbPrimary shard to run the shards part.
const auto dbInfo =
uassertStatusOK(Grid::get(expCtx->getOperationContext())
->catalogCache()
->getDatabase(expCtx->getOperationContext(),
expCtx->getNamespaceString().dbName()));
return {dbInfo->getPrimary()};
}
tassert(8361100, "Need CollectionRoutingInfo to target sharded query", cri);
return getTargetedShardsForQuery(expCtx, *cri, shardQuery, collation);
}
bool stageCanRunInParallel(const boost::intrusive_ptr<DocumentSource>& stage,
const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) {
if (stage->distributedPlanLogic()) {
@ -684,8 +620,63 @@ std::unique_ptr<Pipeline> tryAttachCursorSourceForLocalRead(
}
return nullptr;
}
std::set<ShardId> getTargetedShardsForAllShardsRequest(OperationContext* opCtx) {
auto shardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
return {std::make_move_iterator(shardIds.begin()), std::make_move_iterator(shardIds.end())};
}
std::set<ShardId> getTargetedShardsForPipelineGeteratingOwnDataOnce(
const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<ShardId> mergeShardId) {
if (mergeShardId) {
return std::set<ShardId>{*mergeShardId};
}
// The output collection is nonexistent or sharded, so we cannot determine a
// single mergeShardId. Designate the dbPrimary shard to run the shards part.
auto* opCtx = expCtx->getOperationContext();
const auto dbInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(
opCtx, expCtx->getNamespaceString().dbName()));
return std::set<ShardId>{dbInfo->getPrimary()};
}
std::set<ShardId> getTargetedShardsForChangeStream(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
// If we are running change stream of version v2, the shard targeting will be determined via the
// ChangeStreamHandleTopologyChangeV2, therefore we return an empty shard set.
if (expCtx->isChangeStreamV2()) {
return {};
}
return getTargetedShardsForAllShardsRequest(expCtx->getOperationContext());
}
} // namespace
std::set<ShardId> getTargetedShards(boost::intrusive_ptr<ExpressionContext> expCtx,
PipelineDataSource pipelineDataSource,
const boost::optional<CollectionRoutingInfo>& cri,
const BSONObj& shardQuery,
const BSONObj& collation,
const boost::optional<ShardId>& mergeShardId) {
switch (pipelineDataSource) {
case PipelineDataSource::kGeneratesOwnDataOnce:
return getTargetedShardsForPipelineGeteratingOwnDataOnce(expCtx, mergeShardId);
case PipelineDataSource::kChangeStream:
return getTargetedShardsForChangeStream(expCtx);
case PipelineDataSource::kNormal:
if (expCtx->getNamespaceString().isCollectionlessAggregateNS()) {
return getTargetedShardsForAllShardsRequest(expCtx->getOperationContext());
}
tassert(8361100, "Need CollectionRoutingInfo to target sharded query", cri);
return getTargetedShardsForQuery(expCtx, *cri, shardQuery, collation);
}
MONGO_UNREACHABLE_TASSERT(10744200);
}
std::unique_ptr<Pipeline> runPipelineDirectlyOnSingleShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
AggregateCommandRequest request,
@ -938,18 +929,13 @@ TargetingResults targetPipeline(const boost::intrusive_ptr<ExpressionContext>& e
boost::optional<ShardId> mergeShardId = pipeline->needsSpecificShardMerger();
const bool mustRunOnAllShards =
checkIfMustRunOnAllShards(expCtx->getNamespaceString(), pipelineDataSource);
std::set<ShardId> shardIds = getTargetedShards(expCtx,
pipelineDataSource,
mustRunOnAllShards,
cri,
shardQuery,
shardTargetingCollation,
mergeShardId);
std::set<ShardId> shardIds = getTargetedShards(
expCtx, pipelineDataSource, cri, shardQuery, shardTargetingCollation, mergeShardId);
bool targetAllHosts = pipeline->needsAllShardHosts();
// Don't need to split the pipeline if we are only targeting a single shard, unless:
// - The pipeline contains one or more stages which must always merge on router.
// - The pipeline requires the merge to be performed on a specific shard that is not targeted.
const bool targetAllHosts = pipeline->needsAllShardHosts();
const bool needsSplit = (shardIds.size() > 1u) || needsRouterMerge || targetAllHosts ||
(mergeShardId && *(shardIds.begin()) != mergeShardId);
@ -975,14 +961,10 @@ TargetingResults targetPipeline(const boost::intrusive_ptr<ExpressionContext>& e
Grid::get(expCtx->getOperationContext())
->shardRegistry()
->reload(expCtx->getOperationContext());
// Rebuild the set of shards as the shard registry might have changed.
shardIds = getTargetedShards(expCtx,
pipelineDataSource,
mustRunOnAllShards,
cri,
shardQuery,
shardTargetingCollation,
mergeShardId);
shardIds = getTargetedShards(
expCtx, pipelineDataSource, cri, shardQuery, shardTargetingCollation, mergeShardId);
// Check that no shard has been removed since the change stream open time to detect a
// possible event loss. It is important to execute it after retrieving the most recent list
@ -1180,6 +1162,25 @@ DispatchShardPipelineResults dispatchTargetedShardPipeline(
std::move(readConcern),
boost::none,
requestQueryStatsFromRemotes));
// Shard targeting for change streams v2 is performed in ChangeStreamHandleTopologyChangeV2
// stage. Here we early exit with empty DispatchShardPipelineResults.
if (expCtx->isChangeStreamV2()) {
tassert(10744202,
"set of targeted shards should be empty, as shard targeting is handled in "
"ChangeStreamHandleTopologyChangeV2 stage",
shardCount == 0);
return DispatchShardPipelineResults{std::move(mergeShardId),
{},
{},
std::move(splitPipelines),
std::move(pipeline),
targetedCommand,
shardCount,
exchangeSpec};
}
// If there were no shards when we began execution, we wouldn't have run this aggregation in the
// first place. Here, we double-check that the shards have not been removed mid-operation.
uassert(ErrorCodes::ShardNotFound,
@ -1234,11 +1235,11 @@ DispatchShardPipelineResults dispatchTargetedShardPipeline(
<< ") and we were not targeting each mongod in each shard",
targetAllHosts || cursors.size() % shardCount == 0);
// For $changeStream, we must open an extra cursor on the 'config.shards' collection, so
// For $changeStream v1, we must open an extra cursor on the 'config.shards' collection, so
// that we can monitor for the addition of new shards inline with real events.
if (pipelineDataSource == PipelineDataSource::kChangeStream &&
!expCtx->getNamespaceString().isEqualDb(NamespaceString::kConfigsvrShardsNamespace)) {
cursors.emplace_back(openChangeStreamNewShardMonitor(expCtx, shardRegistryReloadTime));
if (auto&& cursor = openChangeStreamCursorOnConfigsvrIfNeeded(
expCtx, pipelineDataSource, shardRegistryReloadTime)) {
cursors.emplace_back(std::move(*cursor));
}
}
@ -1291,8 +1292,11 @@ DispatchShardPipelineResults dispatchShardPipeline(
shardIds.erase(shard);
}
// Return if we don't need to establish any cursors.
if (shardIds.empty()) {
// Early exit if we don't need to establish any cursors, unless it's change stream v2.
// Shard targeting for change stream v2 is not determined here, but by
// ChangeStreamHandleTopologyChangeV2 stage. In order to ensure this we need to dispatch
// the merge pipeline on mongos with $mergeCursors stage.
if (shardIds.empty() && !expCtx->isChangeStreamV2()) {
tassert(7958303,
"Expected no merge shard id when shardIds are empty",
!targeting.mergeShardId.has_value());
@ -1938,5 +1942,54 @@ std::unique_ptr<Pipeline> finalizeAndMaybePreparePipelineForExecution(
});
}
boost::optional<RemoteCursor> openChangeStreamCursorOnConfigsvrIfNeeded(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
PipelineDataSource pipelineDataSource,
Timestamp startMonitoringAtTime) {
// Do not open change stream cursors on configsvr for non change stream pipelines.
if (pipelineDataSource != PipelineDataSource::kChangeStream) {
return {};
}
// Do not open change stream on configsvr for v2 change streams as shard targeting will be
// determined via the ChangeStreamHandleTopologyChangeV2 stage.
if (expCtx->isChangeStreamV2()) {
return {};
}
// Do not open change stream on configsvr if we are running a change stream on configsvr already
// over 'config.shards' collection.
if (expCtx->getNamespaceString().isEqualDb(NamespaceString::kConfigsvrShardsNamespace)) {
return {};
}
const auto& configShard =
Grid::get(expCtx->getOperationContext())->shardRegistry()->getConfigShard();
// Pipeline: {$changeStream: {startAtOperationTime: [now], allowToRunOnConfigDB: true}}
AggregateCommandRequest aggReq(
NamespaceString::kConfigsvrShardsNamespace,
{BSON(DocumentSourceChangeStream::kStageName
<< BSON(DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName
<< startMonitoringAtTime
<< DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))});
aggregation_request_helper::setFromRouter(
VersionContext::getDecoration(expCtx->getOperationContext()), aggReq, true);
aggReq.setNeedsMerge(true);
SimpleCursorOptions cursor;
cursor.setBatchSize(0);
aggReq.setCursor(cursor);
setReadWriteConcern(expCtx->getOperationContext(), aggReq, true, !expCtx->getExplain());
auto configCursor = establishCursors(expCtx->getOperationContext(),
expCtx->getMongoProcessInterface()->taskExecutor,
aggReq.getNamespace(),
ReadPreferenceSetting{ReadPreference::SecondaryPreferred},
{{configShard->getId(), aggReq.toBSON()}},
false);
tassert(10744201, "A single cursor over configsvr should be opened", configCursor.size() == 1);
return std::move(*configCursor.begin());
}
} // namespace sharded_agg_helpers
} // namespace mongo

View File

@ -189,6 +189,15 @@ void partitionAndAddMergeCursorsSource(Pipeline* pipeline,
*/
BSONObj targetShardsForExplain(Pipeline* ownedPipeline);
/**
* Returns a set of targeted shards responsible for answering the 'shardQuery'.
*/
std::set<ShardId> getTargetedShards(boost::intrusive_ptr<ExpressionContext> expCtx,
PipelineDataSource pipelineDataSource,
const boost::optional<CollectionRoutingInfo>& cri,
const BSONObj& shardQuery,
const BSONObj& collation,
const boost::optional<ShardId>& mergeShardId);
void mergeExplainOutputFromShards(const std::vector<AsyncRequestsSender::Response>& shardResponses,
BSONObjBuilder* result);
@ -292,5 +301,16 @@ std::unique_ptr<Pipeline> runPipelineDirectlyOnSingleShard(
ShardId shardId,
bool requestQueryStatsFromRemotes);
/**
* Opens a $changeStream cursor on the 'config.shards' collection to watch for new shards if:
* - 'pipelineDataSource' is kChangeStream
* - change stream is not of version v2
* - change stream is not already running over 'config.shards' collection.
*/
boost::optional<RemoteCursor> openChangeStreamCursorOnConfigsvrIfNeeded(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
PipelineDataSource pipelineDataSource,
Timestamp startMonitoringAtTime);
} // namespace sharded_agg_helpers
} // namespace mongo

View File

@ -0,0 +1,250 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/sharding_environment/shard_id.h"
#include "mongo/s/query/exec/sharded_agg_test_fixture.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include <boost/move/utility_core.hpp>
#include <boost/none.hpp>
#include <boost/optional/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
namespace mongo::sharded_agg_helpers {
namespace {
class ShardedAggHelpersFixture : public ShardedAggTestFixture {
public:
std::set<ShardId> setupShards(size_t n) {
std::set<ShardId> allShardIds;
for (auto&& shard : setupNShards(n)) {
allShardIds.insert(shard.getName());
}
return allShardIds;
}
void setChangeStreamVersionToExpCtx(ChangeStreamReaderVersionEnum version) {
auto&& spec = expCtx()->getChangeStreamSpec().value_or(DocumentSourceChangeStreamSpec());
spec.setVersion(version);
expCtx()->setChangeStreamSpec(spec);
}
};
TEST_F(
ShardedAggHelpersFixture,
Given_GeneratesOwnDataOnceAndMergeShardId_When_CallingGetTargetedShards_Then_ReturnsMergeShardId) {
for (auto&& shardId : setupShards(3)) {
ASSERT_EQ(getTargetedShards(expCtx(),
PipelineDataSource::kGeneratesOwnDataOnce,
boost::none /* cri */,
BSONObj(),
BSONObj(),
shardId),
std::set<ShardId>{shardId});
}
}
TEST_F(
ShardedAggHelpersFixture,
Given_GeneratesOwnDataOnceAndNoMergeShardId_When_CallingGetTargetedShards_Then_ReturnsShardIdFromTheGetDatabaseCall) {
auto allShardIds = setupShards(3);
ShardId shardId("1");
// Mock ShardId returned by the GetDatabase call for the given namespace.
auto future = launchAsync([&] { expectGetDatabase(kTestAggregateNss, shardId.toString()); });
ASSERT_EQ(getTargetedShards(expCtx(),
PipelineDataSource::kGeneratesOwnDataOnce,
boost::none /* cri */,
BSONObj(),
BSONObj(),
boost::none /* mergeShardId */),
std::set<ShardId>{shardId});
future.default_timed_get();
}
TEST_F(ShardedAggHelpersFixture,
Given_ChangeStreamV1_WhenCallingGetTargetedShards_Then_ReturnsAllShards) {
std::set<ShardId> allShardIds = setupShards(3);
setChangeStreamVersionToExpCtx(ChangeStreamReaderVersionEnum::kV1);
ASSERT_EQ(getTargetedShards(expCtx(),
PipelineDataSource::kChangeStream,
boost::none /* cri */,
BSONObj(),
BSONObj(),
boost::none /* mergeShardId */),
allShardIds);
}
TEST_F(ShardedAggHelpersFixture,
Given_ChangeStreamV2_WhenCallingGetTargetedShards_Then_ReturnsNoShards) {
std::set<ShardId> allShardIds = setupShards(3);
setChangeStreamVersionToExpCtx(ChangeStreamReaderVersionEnum::kV2);
ASSERT_EQ(getTargetedShards(expCtx(),
PipelineDataSource::kChangeStream,
boost::none /* cri */,
BSONObj(),
BSONObj(),
boost::none /* mergeShardId */),
std::set<ShardId>{});
}
TEST_F(
ShardedAggHelpersFixture,
Given_NormalDataSourceAndCollectionlessNss_When_CallingGetTargetedShards_Then_ReturnsAllShards) {
std::set<ShardId> allShardIds = setupShards(3);
auto nss = NamespaceString::makeCollectionlessAggregateNSS(DatabaseName::kMdbTesting);
expCtx()->setNamespaceString(nss);
ASSERT_EQ(getTargetedShards(expCtx(),
PipelineDataSource::kNormal,
boost::none /* cri */,
BSONObj(),
BSONObj(),
boost::none /* mergeShardId */),
allShardIds);
}
TEST_F(
ShardedAggHelpersFixture,
Given_NormalDataSourceAndUnshardedCollection_When_CallingGetTargetedShards_Then_ReturnsDbPrimaryShard) {
std::set<ShardId> allShardIds = setupShards(3);
expCtx()->setNamespaceString(kTestAggregateNss);
auto cri = makeUnshardedCollectionRoutingInfo(kTestAggregateNss);
ASSERT_EQ(getTargetedShards(expCtx(),
PipelineDataSource::kNormal,
cri,
BSONObj(),
BSONObj(),
boost::none /* mergeShardId */),
std::set<ShardId>{ShardId("0")});
}
TEST_F(
ShardedAggHelpersFixture,
Given_NormalDataSourceAndUntrackedCollection_When_CallingGetTargetedShards_Then_ReturnsDbPrimaryShard) {
std::set<ShardId> allShardIds = setupShards(3);
expCtx()->setNamespaceString(kTestAggregateNss);
auto cri = makeUntrackedCollectionRoutingInfo(kTestAggregateNss);
ASSERT_EQ(getTargetedShards(expCtx(),
PipelineDataSource::kNormal,
cri,
BSONObj(),
BSONObj(),
boost::none /* mergeShardId */),
std::set<ShardId>{ShardId("0")});
}
TEST_F(
ShardedAggHelpersFixture,
Given_NormalDataSourceAndCriWithRoutingInfo_When_CallingGetTargetedShards_Then_ReturnsRelevantShards) {
std::set<ShardId> allShardIds = setupShards(3);
auto nss = kTestAggregateNss;
expCtx()->setNamespaceString(nss);
loadRoutingTableWithTwoChunksAndTwoShards(nss);
std::set<ShardId> expectedShardIds{ShardId("0"), ShardId("1")};
auto catalogCache = Grid::get(getServiceContext())->catalogCache();
const auto cri =
uassertStatusOK(catalogCache->getCollectionRoutingInfo(operationContext(), nss));
ASSERT_EQ(getTargetedShards(expCtx(),
PipelineDataSource::kNormal,
cri,
BSONObj(),
BSONObj(),
boost::none /* mergeShardId */),
expectedShardIds);
}
TEST_F(
ShardedAggHelpersFixture,
Given_NonChangeStreamPipeline_When_CallingOpenChangeStreamOnConfigsvr_Then_ReturnsNoRemoteCursor) {
ASSERT_EQ(openChangeStreamCursorOnConfigsvrIfNeeded(
expCtx(), PipelineDataSource::kGeneratesOwnDataOnce, Timestamp()),
boost::none);
ASSERT_EQ(openChangeStreamCursorOnConfigsvrIfNeeded(
expCtx(), PipelineDataSource::kNormal, Timestamp()),
boost::none);
}
TEST_F(
ShardedAggHelpersFixture,
Given_ChangeStreamV2Pipeline_When_CallingOpenChangeStreamOnConfigsvr_Then_ReturnsNoRemoteCursor) {
setChangeStreamVersionToExpCtx(ChangeStreamReaderVersionEnum::kV2);
ASSERT_EQ(openChangeStreamCursorOnConfigsvrIfNeeded(
expCtx(), PipelineDataSource::kChangeStream, Timestamp()),
boost::none);
}
TEST_F(
ShardedAggHelpersFixture,
Given_ChangeStreamPipelineOverConfigShardsNss_When_CallingOpenChangeStreamOnConfigsvr_Then_ReturnsNoRemoteCursor) {
expCtx()->setNamespaceString(NamespaceString::kConfigsvrShardsNamespace);
ASSERT_EQ(openChangeStreamCursorOnConfigsvrIfNeeded(
expCtx(), PipelineDataSource::kChangeStream, Timestamp()),
boost::none);
}
TEST_F(
ShardedAggHelpersFixture,
Given_ChangeStreamPipelineOverRegularNss_When_CallingOpenChangeStreamOnConfigsvr_Then_ReturnsCursorOverConfigsvr) {
expCtx()->setNamespaceString(kTestAggregateNss);
auto future = launchAsync([&] {
auto&& configsvrCursor = openChangeStreamCursorOnConfigsvrIfNeeded(
expCtx(), PipelineDataSource::kChangeStream, Timestamp());
ASSERT_EQ(configsvrCursor->getCursorResponse().getCursorId(), CursorId(123));
});
// Mock response for shard refresh.
expectGetShards({ShardType("1", "1")});
// Mock response from the agg request to the configsvr.
onCommand([this](const executor::RemoteCommandRequest& request) {
ASSERT_EQ(NamespaceString::kConfigsvrShardsNamespace.coll(),
request.cmdObj.firstElement().valueStringData());
CursorResponse cursorResponse(
NamespaceString::kConfigsvrShardsNamespace, CursorId(123), {});
return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
});
future.default_timed_get();
}
} // namespace
} // namespace mongo::sharded_agg_helpers

View File

@ -30,7 +30,6 @@
#include "mongo/s/change_streams/change_stream_db_absent_state_event_handler.h"
#include "mongo/bson/timestamp.h"
#include "mongo/s/change_streams/collection_change_stream_db_present_state_event_handler.h"
#include "mongo/stdx/unordered_set.h"
#include "mongo/util/assert_util.h"
@ -69,8 +68,7 @@ ShardTargeterDecision ChangeStreamShardTargeterDbAbsentStateEventHandler::handle
readerCtx.openCursorsOnDataShards(clusterTime + 1, shardSet);
// Since the database is now present, change the state event handler.
ctx.setEventHandler(
std::make_unique<CollectionChangeStreamShardTargeterDbPresentStateEventHandler>());
ctx.setEventHandler(buildDbPresentStateEventHandler());
return ShardTargeterDecision::kContinue;
}

View File

@ -31,8 +31,6 @@
#include "mongo/util/assert_util.h"
#include <algorithm>
namespace mongo {
namespace {
void updateActiveShardCursors(Timestamp atClusterTime,
@ -40,22 +38,22 @@ void updateActiveShardCursors(Timestamp atClusterTime,
ChangeStreamReaderContext& readerCtx) {
const auto& currentActiveShardSet = readerCtx.getCurrentlyTargetedDataShards();
stdx::unordered_set<ShardId> shardsToCloseCursors;
std::set_difference(currentActiveShardSet.begin(),
currentActiveShardSet.end(),
newActiveShardSet.begin(),
newActiveShardSet.end(),
std::inserter(shardsToCloseCursors, shardsToCloseCursors.begin()));
for (const auto& currentActiveShard : currentActiveShardSet) {
if (!newActiveShardSet.contains(currentActiveShard)) {
shardsToCloseCursors.insert(currentActiveShard);
}
}
if (!shardsToCloseCursors.empty()) {
readerCtx.closeCursorsOnDataShards(shardsToCloseCursors);
}
stdx::unordered_set<ShardId> shardsToOpenCursors;
std::set_difference(newActiveShardSet.begin(),
newActiveShardSet.end(),
currentActiveShardSet.begin(),
currentActiveShardSet.end(),
std::inserter(shardsToOpenCursors, shardsToOpenCursors.begin()));
for (const auto& newActiveShard : newActiveShardSet) {
if (!currentActiveShardSet.contains(newActiveShard)) {
shardsToOpenCursors.insert(newActiveShard);
}
}
if (!shardsToOpenCursors.empty()) {
readerCtx.openCursorsOnDataShards(atClusterTime, shardsToOpenCursors);

View File

@ -214,14 +214,15 @@ TEST_F(
CollectionDbPresentStateEventHandlerFixture,
Given_NamespacePlacementChangedControlEventWithShards_When_HandleEventIsCalled_Then_CursorsAreUpdated) {
Timestamp clusterTime(60, 10);
Timestamp committedAt(60, 10);
NamespacePlacementChangedControlEvent event{clusterTime, committedAt, makeTestNss()};
NamespacePlacementChangedControlEvent event{clusterTime, makeTestNss()};
ShardId shardA("shardA");
ShardId shardB("shardB");
readerCtx().currentlyTargetedShards = {shardA};
ShardId shardC("shardC");
ShardId shardD("shardD");
readerCtx().currentlyTargetedShards = {shardA, shardD};
std::vector<ShardId> shards = {shardB};
std::vector<ShardId> shards = {shardA, shardB, shardC};
stdx::unordered_set<ShardId> shardSet(shards.begin(), shards.end());
std::vector<HistoricalPlacementFetcherMock::Response> responses{
{clusterTime, HistoricalPlacement(shards, HistoricalPlacementStatus::OK)}};
@ -234,11 +235,11 @@ TEST_F(
ASSERT_EQ(readerCtx().openCursorsOnDataShardsCalls.size(), 1);
ASSERT_EQ(readerCtx().openCursorsOnDataShardsCalls[0].atClusterTime, clusterTime + 1);
ASSERT_EQ(readerCtx().openCursorsOnDataShardsCalls[0].shardSet,
stdx::unordered_set<ShardId>{shardB});
(stdx::unordered_set<ShardId>{shardB, shardC}));
ASSERT_EQ(readerCtx().closeCursorsOnDataShardsCalls.size(), 1);
ASSERT_EQ(readerCtx().closeCursorsOnDataShardsCalls[0].shardSet,
stdx::unordered_set<ShardId>{shardA});
(stdx::unordered_set<ShardId>{shardD}));
}
TEST_F(

View File

@ -159,10 +159,11 @@ TEST_F(CollectionChangeStreamShardTargeterImplFixture,
auto& eventHandlerMock =
dynamic_cast<ChangeStreamShardTargeterEventHandlerMock&>(*targeter().getEventHandler());
Document moveChunkEvent = Document(BSON(
"operationType" << MoveChunkControlEvent::opType << "clusterTime" << Timestamp() << "donor"
<< "shardA" << "recipient" << "shardB"
<< "allCollectionChunksMigratedFromDonor" << true));
Document moveChunkEvent = Document(
BSON("operationType" << MoveChunkControlEvent::opType << "clusterTime" << Timestamp()
<< "operationDescription"
<< BSON("donor" << "shardA" << "recipient" << "shardB"
<< "allCollectionChunksMigratedFromDonor" << true)));
auto controlEvent = parseControlEvent(moveChunkEvent);
targeter().handleEvent(opCtx(), moveChunkEvent, readerCtx());
ASSERT_EQ(eventHandlerMock.calls.size(), 1);

View File

@ -48,6 +48,7 @@ static constexpr StringData kAllCollectionChunksMigratedFromDonorField =
"allCollectionChunksMigratedFromDonor"_sd;
static constexpr StringData kFromField = "from"_sd;
static constexpr StringData kToField = "to"_sd;
static constexpr StringData kOperationDescriptionField = "operationDescription"_sd;
Value assertFieldType(const Document& document, StringData fieldName, BSONType expectedType) {
auto val = document[fieldName];
@ -62,22 +63,27 @@ Value assertFieldType(const Document& document, StringData fieldName, BSONType e
} // namespace
MoveChunkControlEvent MoveChunkControlEvent::createFromDocument(const Document& event) {
auto opDescription =
assertFieldType(event, kOperationDescriptionField, BSONType::object).getDocument();
Timestamp clusterTime =
assertFieldType(event, kClusterTimeField, BSONType::timestamp).getTimestamp();
ShardId fromShard = assertFieldType(event, kDonorField, BSONType::string).getString();
ShardId toShard = assertFieldType(event, kRecipientField, BSONType::string).getString();
ShardId fromShard = assertFieldType(opDescription, kDonorField, BSONType::string).getString();
ShardId toShard = assertFieldType(opDescription, kRecipientField, BSONType::string).getString();
bool allCollectionChunksMigratedFromDonor =
assertFieldType(event, kAllCollectionChunksMigratedFromDonorField, BSONType::boolean)
assertFieldType(
opDescription, kAllCollectionChunksMigratedFromDonorField, BSONType::boolean)
.getBool();
return MoveChunkControlEvent{
clusterTime, fromShard, toShard, allCollectionChunksMigratedFromDonor};
}
MovePrimaryControlEvent MovePrimaryControlEvent::createFromDocument(const Document& event) {
auto opDescription =
assertFieldType(event, kOperationDescriptionField, BSONType::object).getDocument();
Timestamp clusterTime =
assertFieldType(event, kClusterTimeField, BSONType::timestamp).getTimestamp();
ShardId fromShard = assertFieldType(event, kFromField, BSONType::string).getString();
ShardId toShard = assertFieldType(event, kToField, BSONType::string).getString();
ShardId fromShard = assertFieldType(opDescription, kFromField, BSONType::string).getString();
ShardId toShard = assertFieldType(opDescription, kToField, BSONType::string).getString();
return MovePrimaryControlEvent{clusterTime, fromShard, toShard};
}
@ -85,13 +91,11 @@ NamespacePlacementChangedControlEvent NamespacePlacementChangedControlEvent::cre
const Document& event) {
Timestamp clusterTime =
assertFieldType(event, kClusterTimeField, BSONType::timestamp).getTimestamp();
Timestamp committedAt =
assertFieldType(event, kCommittedAtField, BSONType::timestamp).getTimestamp();
auto nsField = assertFieldType(event, kNamespaceField, BSONType::object).getDocument();
auto nssSpec = NamespaceSpec::parse(nsField.toBson(),
IDLParserContext("NamespacePlacementChangedControlEvent"));
NamespaceString nss = NamespaceStringUtil::deserialize(*nssSpec.getDb(), *nssSpec.getColl());
return NamespacePlacementChangedControlEvent{clusterTime, committedAt, nss};
return NamespacePlacementChangedControlEvent{clusterTime, nss};
}
DatabaseCreatedControlEvent DatabaseCreatedControlEvent::createFromDocument(const Document& event) {

View File

@ -82,7 +82,6 @@ struct NamespacePlacementChangedControlEvent {
bool operator==(const NamespacePlacementChangedControlEvent& other) const = default;
Timestamp clusterTime;
Timestamp committedAt;
NamespaceString nss;
};

View File

@ -49,10 +49,11 @@ TEST(
Timestamp ts;
ShardId donorShard("fromShard");
ShardId recipientShard("toShard");
Document event =
Document(BSON("operationType" << MoveChunkControlEvent::opType << "clusterTime" << ts
<< "donor" << donorShard << "recipient" << recipientShard
<< "allCollectionChunksMigratedFromDonor" << true));
Document event = Document(
BSON("operationType" << MoveChunkControlEvent::opType << "clusterTime" << ts
<< "operationDescription"
<< BSON("donor" << donorShard << "recipient" << recipientShard
<< "allCollectionChunksMigratedFromDonor" << true)));
ControlEvent expectedControlEvent = MoveChunkControlEvent{ts, donorShard, recipientShard, true};
ASSERT_EQ(parseControlEvent(event), expectedControlEvent);
@ -73,7 +74,8 @@ TEST(
ShardId recipientShard("toShard");
Document event =
Document(BSON("operationType" << MovePrimaryControlEvent::opType << "clusterTime" << ts
<< "from" << donorShard << "to" << recipientShard));
<< "operationDescription"
<< BSON("from" << donorShard << "to" << recipientShard)));
ControlEvent expectedControlEvent = MovePrimaryControlEvent{ts, donorShard, recipientShard};
ASSERT_EQ(parseControlEvent(event), expectedControlEvent);
@ -90,7 +92,6 @@ TEST(
ControlEventTest,
GivenValidNamespacePlacementChangedControlEventAsDocument_WhenCallingParseControlEvent_ThenParsingIsSuccessful) {
Timestamp ts;
Timestamp committedAt;
auto nss = NamespaceString::kDefaultInitialSyncIdNamespace;
auto nssSpec = [&]() {
@ -100,11 +101,11 @@ TEST(
return nssSpec;
}();
Document event = Document(BSON("operationType" << NamespacePlacementChangedControlEvent::opType
<< "clusterTime" << ts << "committedAt"
<< committedAt << "ns" << nssSpec.toBSON()));
Document event =
Document(BSON("operationType" << NamespacePlacementChangedControlEvent::opType
<< "clusterTime" << ts << "ns" << nssSpec.toBSON()));
ControlEvent expectedControlEvent = NamespacePlacementChangedControlEvent{ts, committedAt, nss};
ControlEvent expectedControlEvent = NamespacePlacementChangedControlEvent{ts, nss};
ASSERT_EQ(parseControlEvent(event), expectedControlEvent);
}

View File

@ -45,7 +45,7 @@ HistoricalPlacement HistoricalPlacementFetcherImpl::fetch(
// string.
const auto targetWholeCluster = !nss.has_value() || nss->isEmpty();
ConfigsvrGetHistoricalPlacement request(
targetWholeCluster ? nss.value() : NamespaceString::kEmpty, atClusterTime);
targetWholeCluster ? NamespaceString::kEmpty : nss.value(), atClusterTime);
request.setTargetWholeCluster(targetWholeCluster);
request.setCheckIfPointInTimeIsInFuture(checkIfPointInTimeIsInFuture);

View File

@ -126,6 +126,12 @@ BSONObj RouterStagePipeline::_validateAndConvertToBSON(const Document& event) {
}
bool RouterStagePipeline::remotesExhausted() const {
// Change stream pipelines can never be exhausted. Instead invalidation event may be sent,
// closing the stream.
if (_mergePipeline->getContext()->isTailableAwaitData()) {
return false;
}
return !_mergeCursorsStage || _mergeCursorsStage->remotesExhausted();
}

View File

@ -844,6 +844,7 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
bool eligibleForSampling,
bool requestQueryStatsFromRemotes) {
auto expCtx = targeter.pipeline->getContext();
const bool isChangeStreamV2Pipeline = expCtx->isChangeStreamV2();
// If not, split the pipeline as necessary and dispatch to the relevant shards.
auto shardDispatchResults =
@ -859,8 +860,9 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
// Check for valid usage of SEARCH_META. We wait until after we've dispatched pipelines to the
// shards in the event that we need to resolve any views.
// TODO PM-1966: We can resume doing this at parse time once views are tracked in the catalog.
// Change stream v2 does not have a shard pipeline.
auto svcCtx = opCtx->getServiceContext();
if (svcCtx) {
if (svcCtx && !isChangeStreamV2Pipeline) {
if (shardDispatchResults.pipelineForSingleShard) {
search_helpers::assertSearchMetaAccessValid(
shardDispatchResults.pipelineForSingleShard->getSources(), expCtx.get());
@ -882,9 +884,39 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
std::move(shardDispatchResults), expCtx, result);
}
// If this isn't an explain, then we must have established cursors on at least one
// shard.
invariant(shardDispatchResults.remoteCursors.size() > 0);
// Change stream v2 pipeline does not target any shards, yet needs to have a $mergeCursors stage
// attached to its merge pipeline running on mongos.
if (isChangeStreamV2Pipeline) {
tassert(10744203,
"change stream v2 should not target any shards",
shardDispatchResults.remoteCursors.empty());
auto mongosPipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline);
tassert(10744204,
"tried to dispatch merge pipeline but there was no merge portion of the split "
"pipeline",
mongosPipeline);
// Add $mergeCursors stage to the merge pipeline with an empty set of remote shards. Shard
// cursors will be later added by the ChangeStreamHandleTopologyChangeV2 stage.
sharded_agg_helpers::partitionAndAddMergeCursorsSource(
mongosPipeline.get(),
{} /* cursors */,
shardDispatchResults.splitPipeline->shardCursorsSortSpec,
requestQueryStatsFromRemotes);
return runPipelineOnMongoS(namespaces,
batchSize,
std::move(mongosPipeline),
result,
privileges,
requestQueryStatsFromRemotes);
}
// If this isn't an explain or change stream v2, then we must have established cursors on at
// least one shard.
tassert(10744205,
"aggregate must have established cursors on at least one shard",
shardDispatchResults.remoteCursors.size() > 0);
// If we sent the entire pipeline to a single shard, store the remote cursor and return.
if (!shardDispatchResults.splitPipeline) {