SERVER-113286 Generate NamespacePlacementChanged on addition of first shard in the sharded cluster (#44492)

GitOrigin-RevId: 27587db826a4d55611183cc4308be04af342b790
This commit is contained in:
ppolato 2025-12-02 11:32:47 +01:00 committed by MongoDB Bot
parent b61c049777
commit 6284daf6e3
9 changed files with 102 additions and 25 deletions

View File

@ -13,6 +13,21 @@ const st = new ShardingTest({
config: {nodes: 1}, config: {nodes: 1},
}); });
// A 'namespacePlacementChanged' op log entry is also generated by the first replica set joining the sharded cluster during the fixture setup
// with a 'o2.committedAt' timestamp matching the topology time.
const initialPlacementHistoryResetTimestamp = (() => {
const retrievedEvents = [st.rs0, st.rs1].reduce((accumulator, rs) => {
const primary = rs.getPrimary();
const matchingEvents = primary
.getCollection("local.oplog.rs")
.find({"ns": "", "o.msg.namespacePlacementChanged": ""})
.toArray();
return accumulator.concat(matchingEvents);
}, []);
assert.eq(retrievedEvents.length, 1, "Expected to find exactly one 'namespacePlacementChanged' across the cluster");
return retrievedEvents[0].o2.committedAt;
})();
const configPrimary = st.configRS.getPrimary(); const configPrimary = st.configRS.getPrimary();
// Identifier for the config.placementHistory special documents holding metadata about the operational boundaries set by the initialization. // Identifier for the config.placementHistory special documents holding metadata about the operational boundaries set by the initialization.
@ -77,7 +92,11 @@ function launchAndPauseResetPlacementHistory() {
const primary = rs.getPrimary(); const primary = rs.getPrimary();
const placementHistoryChangedNotifications = primary const placementHistoryChangedNotifications = primary
.getCollection("local.oplog.rs") .getCollection("local.oplog.rs")
.find({"ns": "", "o2.namespacePlacementChanged": 1}) .find({
"ns": "",
"o2.namespacePlacementChanged": 1,
"o2.committedAt": {$gt: initialPlacementHistoryResetTimestamp},
})
.toArray(); .toArray();
assert.eq(placementHistoryChangedNotifications.length, 1); assert.eq(placementHistoryChangedNotifications.length, 1);
const entry = placementHistoryChangedNotifications[0]; const entry = placementHistoryChangedNotifications[0];
@ -121,7 +140,11 @@ function launchAndPauseResetPlacementHistory() {
const primary = rs.getPrimary(); const primary = rs.getPrimary();
const placementHistoryChangedNotifications = primary const placementHistoryChangedNotifications = primary
.getCollection("local.oplog.rs") .getCollection("local.oplog.rs")
.find({"ns": "", "o2.namespacePlacementChanged": 1}) .find({
"ns": "",
"o2.namespacePlacementChanged": 1,
"o2.committedAt": {$gt: initialPlacementHistoryResetTimestamp},
})
.sort({ts: -1}) .sort({ts: -1})
.toArray(); .toArray();

View File

@ -43,6 +43,31 @@ describe("Activity of the addShard commit within config.placementHistory", funct
assert.sameMembers(fallbackResponseDescriptor.shards, fallbackResponse); assert.sameMembers(fallbackResponseDescriptor.shards, fallbackResponse);
assert(timestampCmp(fallbackResponseDescriptor.timestamp, Timestamp(0, 1)) === 0); assert(timestampCmp(fallbackResponseDescriptor.timestamp, Timestamp(0, 1)) === 0);
}; };
this.verifyPostCommitNotificationOnAddedReplicaSet = function (replicaSet, expectedCommitTimeValue) {
const namespacePlacementChangedFilter = {op: "n", ns: "", o: {msg: {namespacePlacementChanged: ""}}};
const matchingOpEntries = replicaSet
.getPrimary()
.getCollection("local.oplog.rs")
.find(namespacePlacementChangedFilter)
.toArray();
if (expectedCommitTimeValue === null) {
assert.eq(0, matchingOpEntries.length);
return;
}
assert.eq(1, matchingOpEntries.length);
let placementChangedNotification = matchingOpEntries[0];
const expectedNotificationDetails = {
namespacePlacementChanged: 1,
ns: {},
committedAt: expectedCommitTimeValue,
};
assert.docEq(placementChangedNotification.o2, expectedNotificationDetails);
assert(timestampCmp(expectedCommitTimeValue, placementChangedNotification.ts) < 0);
};
}); });
beforeEach(() => { beforeEach(() => {
@ -67,7 +92,7 @@ describe("Activity of the addShard commit within config.placementHistory", funct
assert.eq(0, this.st.config.placementHistory.countDocuments({})); assert.eq(0, this.st.config.placementHistory.countDocuments({}));
}); });
it("addShard generates initialization metadata when a first empty shard is added", () => { it("addShard generates initialization metadata and a post-commit notification when a first empty shard is added", () => {
const firstShardName = "firstShard"; const firstShardName = "firstShard";
const firstShardRS = this.spinNewReplicaSet(firstShardName); const firstShardRS = this.spinNewReplicaSet(firstShardName);
@ -75,30 +100,35 @@ describe("Activity of the addShard commit within config.placementHistory", funct
const firstShardCreationTime = this.getTopologyTimeOf(firstShardName); const firstShardCreationTime = this.getTopologyTimeOf(firstShardName);
this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [firstShardName]); this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [firstShardName]);
this.verifyPostCommitNotificationOnAddedReplicaSet(firstShardRS, firstShardCreationTime);
}); });
it("transitionFromDedicatedConfigServer generates initialization metadata", () => { it("transitionFromDedicatedConfigServer generates initialization metadata and a post-commit notification", () => {
const configShardName = "config"; const configShardName = "config";
assert.commandWorked(this.st.s.adminCommand({transitionFromDedicatedConfigServer: 1})); assert.commandWorked(this.st.s.adminCommand({transitionFromDedicatedConfigServer: 1}));
const firstShardCreationTime = this.getTopologyTimeOf(configShardName); const firstShardCreationTime = this.getTopologyTimeOf(configShardName);
this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [configShardName]); this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [configShardName]);
this.verifyPostCommitNotificationOnAddedReplicaSet(this.st.configRS, firstShardCreationTime);
}); });
it("addShard preserves the original initialization metadata when a second shard is added", () => { it("the addition of a second shard of the cluster do not generate any initialization metadata or post-commit notification", () => {
const firstShardName = "firstShard"; const firstShardName = "firstShard";
const firstShardRS = this.spinNewReplicaSet(firstShardName); const firstShardRS = this.spinNewReplicaSet(firstShardName);
assert.commandWorked(this.st.s.adminCommand({addShard: firstShardRS.getURL(), name: firstShardName})); assert.commandWorked(this.st.s.adminCommand({addShard: firstShardRS.getURL(), name: firstShardName}));
const firstShardCreationTime = this.getTopologyTimeOf(firstShardName); const firstShardCreationTime = this.getTopologyTimeOf(firstShardName);
this.verifyPostCommitNotificationOnAddedReplicaSet(firstShardRS, firstShardCreationTime);
const secondShardName = "secondShard"; const secondShardName = "secondShard";
const secondShardRS = this.spinNewReplicaSet(secondShardName); const secondShardRS = this.spinNewReplicaSet(secondShardName);
assert.commandWorked(this.st.s.adminCommand({addShard: secondShardRS.getURL(), name: secondShardName})); assert.commandWorked(this.st.s.adminCommand({addShard: secondShardRS.getURL(), name: secondShardName}));
this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [firstShardName]); this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [firstShardName]);
this.verifyPostCommitNotificationOnAddedReplicaSet(secondShardRS, null);
}); });
it("addShard generates initialization metadata and placement documents when a first not-empty shard is added", () => { it("addShard generates initialization metadata, placement documents and a post-commit notification when a first not-empty shard is added", () => {
const firstShardName = "firstShard"; const firstShardName = "firstShard";
const firstShardRS = this.spinNewReplicaSet(firstShardName); const firstShardRS = this.spinNewReplicaSet(firstShardName);
@ -123,5 +153,6 @@ describe("Activity of the addShard commit within config.placementHistory", funct
} }
this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [firstShardName]); this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [firstShardName]);
this.verifyPostCommitNotificationOnAddedReplicaSet(firstShardRS, firstShardCreationTime);
}); });
}); });

View File

@ -73,11 +73,6 @@ describe("$changeStream", function () {
assert.commandWorked(testColl.insert(invalidShardDoc)); assert.commandWorked(testColl.insert(invalidShardDoc));
assert.commandWorked(testColl.insert(fakeShardDoc)); assert.commandWorked(testColl.insert(fakeShardDoc));
// TODO: SERVER-113286 Generate NamespacePlacementChanged on addition of first shard in the sharded cluster.
if (FeatureFlagUtil.isPresentAndEnabled(testDB, "ChangeStreamPreciseShardTargeting")) {
assert.commandWorked(testDB.adminCommand({resetPlacementHistory: 1}));
}
// Log the shard description documents that we just inserted into the collection. // Log the shard description documents that we just inserted into the collection.
jsTest.log.info("Shard docs: ", {docs: testColl.find().toArray()}); jsTest.log.info("Shard docs: ", {docs: testColl.find().toArray()});
}); });

View File

@ -102,6 +102,7 @@ void unblockConflictingDDLs(OperationContext* opCtx) {
void broadcastPlacementHistoryChangedNotification( void broadcastPlacementHistoryChangedNotification(
OperationContext* opCtx, OperationContext* opCtx,
const Timestamp& committedAt,
const OperationSessionInfo& osi, const OperationSessionInfo& osi,
std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) { const CancellationToken& token) {
@ -109,7 +110,7 @@ void broadcastPlacementHistoryChangedNotification(
ShardsvrNotifyShardingEventRequest request( ShardsvrNotifyShardingEventRequest request(
notify_sharding_event::kPlacementHistoryMetadataChanged, notify_sharding_event::kPlacementHistoryMetadataChanged,
PlacementHistoryMetadataChanged().toBSON()); PlacementHistoryMetadataChanged(committedAt).toBSON());
request.setDbName(DatabaseName::kAdmin); request.setDbName(DatabaseName::kAdmin);
generic_argument_util::setMajorityWriteConcern(request); generic_argument_util::setMajorityWriteConcern(request);
@ -255,7 +256,8 @@ ExecutorFuture<void> InitializePlacementHistoryCoordinator::_runImpl(
Phase::kFinalize, Phase::kFinalize,
[this, token, executor = executor, anchor = shared_from_this()](auto* opCtx) { [this, token, executor = executor, anchor = shared_from_this()](auto* opCtx) {
const auto osi = getNewSession(opCtx); const auto osi = getNewSession(opCtx);
broadcastPlacementHistoryChangedNotification(opCtx, osi, executor, token); broadcastPlacementHistoryChangedNotification(
opCtx, _doc.getInitializationTime().value(), osi, executor, token);
PlacementHistoryCleaner::get(opCtx)->resume(opCtx); PlacementHistoryCleaner::get(opCtx)->resume(opCtx);
ShardingLogging::get(opCtx)->logChange(opCtx, "resetPlacementHistory.end", nss()); ShardingLogging::get(opCtx)->logChange(opCtx, "resetPlacementHistory.end", nss());
})) }))

View File

@ -114,14 +114,22 @@ structs:
"Notification of the commit of a metadata change affecting the whole content of config.placementHistory. "Notification of the commit of a metadata change affecting the whole content of config.placementHistory.
Shards are expected to react by emitting a no-op log entry directed to change stream readers." Shards are expected to react by emitting a no-op log entry directed to change stream readers."
strict: false strict: false
fields:
committedAt:
description:
"The cluster time associated to the metadata change,
as persistent in the related config.placementHistory documents."
type: timestamp
commands: commands:
_shardsvrNotifyShardingEvent: _shardsvrNotifyShardingEvent:
command_name: _shardsvrNotifyShardingEvent command_name: _shardsvrNotifyShardingEvent
cpp_name: ShardsvrNotifyShardingEventRequest cpp_name: ShardsvrNotifyShardingEventRequest
description: description:
"Internal command to be invoked by the config server to notify a shard "Internal command supporting the integration between Sharding DDL operations and change stream readers.
of an event concerning the shard itself or the whole cluster." A remote coordinator node may use it to dispatch the notification of an upcoming/committed event to one or more of its participants.
The recipient is expected to react with the generation of a no-op entry describing the details of the notification,
which will be later interpreted as a change/control event by change stream readers targeting it."
namespace: ignored namespace: ignored
api_version: "" api_version: ""
strict: false strict: false

View File

@ -49,10 +49,6 @@
namespace mongo { namespace mongo {
/**
* This command notifies an event on the shard server. The action taken is determined by the
* event ShardsvrAddShard: Add an oplog entry for the new shard.
*/
class ShardsvrNotifyShardingEventCommand : public TypedCommand<ShardsvrNotifyShardingEventCommand> { class ShardsvrNotifyShardingEventCommand : public TypedCommand<ShardsvrNotifyShardingEventCommand> {
public: public:
using Request = ShardsvrNotifyShardingEventRequest; using Request = ShardsvrNotifyShardingEventRequest;
@ -122,7 +118,9 @@ public:
if (request().getEventType() == if (request().getEventType() ==
notify_sharding_event::kPlacementHistoryMetadataChanged) { notify_sharding_event::kPlacementHistoryMetadataChanged) {
notifyChangeStreamsOnPlacementHistoryMetadataChanged(opCtx); const auto event = PlacementHistoryMetadataChanged::parse(
request().getDetails(), IDLParserContext("_shardsvrNotifyShardingEvent"));
notifyChangeStreamsOnPlacementHistoryMetadataChanged(opCtx, event);
return; return;
} }

View File

@ -254,14 +254,15 @@ void notifyChangeStreamsOnNamespacePlacementChanged(OperationContext* opCtx,
"NamespacePlacementChangedWritesOplog"); "NamespacePlacementChangedWritesOplog");
} }
void notifyChangeStreamsOnPlacementHistoryMetadataChanged(OperationContext* opCtx) { void notifyChangeStreamsOnPlacementHistoryMetadataChanged(
Timestamp now(opCtx->fastClockSource().now()); OperationContext* opCtx, const PlacementHistoryMetadataChanged& notification) {
// Global changes to the metadata of placementHistory are encoded as a NamespacePlacementChanged // Global changes to the metadata of placementHistory are encoded as a NamespacePlacementChanged
// notification with an unspecified namespace. // notification with an unspecified namespace.
NamespacePlacementChanged globalChangeNotification(NamespaceString::kEmpty, now); NamespacePlacementChanged repackagedNotification(NamespaceString::kEmpty,
notification.getCommittedAt());
insertNotificationOplogEntries( insertNotificationOplogEntries(
opCtx, opCtx,
{buildNamespacePlacementChangedOplogEntry(opCtx, globalChangeNotification)}, {buildNamespacePlacementChangedOplogEntry(opCtx, repackagedNotification)},
"PlacementHistoryMetadataChangedWritesOplog"); "PlacementHistoryMetadataChangedWritesOplog");
} }

View File

@ -120,7 +120,8 @@ void notifyChangeStreamsOnNamespacePlacementChanged(OperationContext* opCtx,
* Writes a no-op oplog entry concerning the commit of an operation * Writes a no-op oplog entry concerning the commit of an operation
* modifying the operational boundaries of config.placementHistory. * modifying the operational boundaries of config.placementHistory.
*/ */
void notifyChangeStreamsOnPlacementHistoryMetadataChanged(OperationContext* opCtx); void notifyChangeStreamsOnPlacementHistoryMetadataChanged(
OperationContext* opCtx, const PlacementHistoryMetadataChanged& notification);
/** /**
* Writes a no-op oplog entry on the end of multi shard transaction. * Writes a no-op oplog entry on the end of multi shard transaction.

View File

@ -317,6 +317,24 @@ ExecutorFuture<void> AddShardCoordinator::_runImpl(
} }
} }
// V2 Change stream readers consuming events from a promoted replica set that
// predate this commit need to receive a notification about the presence of new
// metadata in config.placementHistory to execute proper retargeting.
if (generatePlacementHistoryInitMetadata) {
auto& targeter = _getTargeter(opCtx);
ShardsvrNotifyShardingEventRequest request(
notify_sharding_event::kPlacementHistoryMetadataChanged,
PlacementHistoryMetadataChanged(newTopologyTime.asTimestamp()).toBSON());
request.setDbName(DatabaseName::kAdmin);
const auto session = getNewSession(opCtx);
generic_argument_util::setMajorityWriteConcern(request);
generic_argument_util::setOperationSessionInfo(request, session);
uassertStatusOK(
topology_change_helpers::runCommandForAddShard(
opCtx, targeter, DatabaseName::kAdmin, request.toBSON(), **executor)
.commandStatus);
}
if (feature_flags::gShardAuthoritativeDbMetadataDDL.isEnabled( if (feature_flags::gShardAuthoritativeDbMetadataDDL.isEnabled(
VersionContext::getDecoration(opCtx), VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) { serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {