From 6284daf6e3c3012d1bd57bcdad2c3ea83cdd4f23 Mon Sep 17 00:00:00 2001 From: ppolato <82828243+ppolato@users.noreply.github.com> Date: Tue, 2 Dec 2025 11:32:47 +0100 Subject: [PATCH] SERVER-113286 Generate NamespacePlacementChanged on addition of first shard in the sharded cluster (#44492) GitOrigin-RevId: 27587db826a4d55611183cc4308be04af342b790 --- .../global_catalog/reset_placement_history.js | 27 ++++++++++++- ...generates_historical_placement_metadata.js | 39 +++++++++++++++++-- .../projection_fakes_internal_event.js | 5 --- ...itialize_placement_history_coordinator.cpp | 6 ++- .../ddl/notify_sharding_event.idl | 12 +++++- ...shardsvr_notify_sharding_event_command.cpp | 8 ++-- .../repl/change_stream_oplog_notification.cpp | 9 +++-- .../repl/change_stream_oplog_notification.h | 3 +- .../db/topology/add_shard_coordinator.cpp | 18 +++++++++ 9 files changed, 102 insertions(+), 25 deletions(-) diff --git a/jstests/noPassthrough/global_catalog/reset_placement_history.js b/jstests/noPassthrough/global_catalog/reset_placement_history.js index 77ea93f1f1b..334c97ef85f 100644 --- a/jstests/noPassthrough/global_catalog/reset_placement_history.js +++ b/jstests/noPassthrough/global_catalog/reset_placement_history.js @@ -13,6 +13,21 @@ const st = new ShardingTest({ config: {nodes: 1}, }); +// A 'namespacePlacementChanged' op log entry is also generated by the first replica set joining the sharded cluster during the fixture setup +// with a 'o2.committedAt' timestamp matching the topology time. +const initialPlacementHistoryResetTimestamp = (() => { + const retrievedEvents = [st.rs0, st.rs1].reduce((accumulator, rs) => { + const primary = rs.getPrimary(); + const matchingEvents = primary + .getCollection("local.oplog.rs") + .find({"ns": "", "o.msg.namespacePlacementChanged": ""}) + .toArray(); + return accumulator.concat(matchingEvents); + }, []); + assert.eq(retrievedEvents.length, 1, "Expected to find exactly one 'namespacePlacementChanged' across the cluster"); + return retrievedEvents[0].o2.committedAt; +})(); + const configPrimary = st.configRS.getPrimary(); // Identifier for the config.placementHistory special documents holding metadata about the operational boundaries set by the initialization. @@ -77,7 +92,11 @@ function launchAndPauseResetPlacementHistory() { const primary = rs.getPrimary(); const placementHistoryChangedNotifications = primary .getCollection("local.oplog.rs") - .find({"ns": "", "o2.namespacePlacementChanged": 1}) + .find({ + "ns": "", + "o2.namespacePlacementChanged": 1, + "o2.committedAt": {$gt: initialPlacementHistoryResetTimestamp}, + }) .toArray(); assert.eq(placementHistoryChangedNotifications.length, 1); const entry = placementHistoryChangedNotifications[0]; @@ -121,7 +140,11 @@ function launchAndPauseResetPlacementHistory() { const primary = rs.getPrimary(); const placementHistoryChangedNotifications = primary .getCollection("local.oplog.rs") - .find({"ns": "", "o2.namespacePlacementChanged": 1}) + .find({ + "ns": "", + "o2.namespacePlacementChanged": 1, + "o2.committedAt": {$gt: initialPlacementHistoryResetTimestamp}, + }) .sort({ts: -1}) .toArray(); diff --git a/jstests/noPassthrough/sharded_cluster_topology/add_shard_generates_historical_placement_metadata.js b/jstests/noPassthrough/sharded_cluster_topology/add_shard_generates_historical_placement_metadata.js index ec955b43cc3..643c059b69a 100644 --- a/jstests/noPassthrough/sharded_cluster_topology/add_shard_generates_historical_placement_metadata.js +++ b/jstests/noPassthrough/sharded_cluster_topology/add_shard_generates_historical_placement_metadata.js @@ -43,6 +43,31 @@ describe("Activity of the addShard commit within config.placementHistory", funct assert.sameMembers(fallbackResponseDescriptor.shards, fallbackResponse); assert(timestampCmp(fallbackResponseDescriptor.timestamp, Timestamp(0, 1)) === 0); }; + + this.verifyPostCommitNotificationOnAddedReplicaSet = function (replicaSet, expectedCommitTimeValue) { + const namespacePlacementChangedFilter = {op: "n", ns: "", o: {msg: {namespacePlacementChanged: ""}}}; + const matchingOpEntries = replicaSet + .getPrimary() + .getCollection("local.oplog.rs") + .find(namespacePlacementChangedFilter) + .toArray(); + + if (expectedCommitTimeValue === null) { + assert.eq(0, matchingOpEntries.length); + return; + } + + assert.eq(1, matchingOpEntries.length); + let placementChangedNotification = matchingOpEntries[0]; + const expectedNotificationDetails = { + namespacePlacementChanged: 1, + ns: {}, + committedAt: expectedCommitTimeValue, + }; + + assert.docEq(placementChangedNotification.o2, expectedNotificationDetails); + assert(timestampCmp(expectedCommitTimeValue, placementChangedNotification.ts) < 0); + }; }); beforeEach(() => { @@ -67,7 +92,7 @@ describe("Activity of the addShard commit within config.placementHistory", funct assert.eq(0, this.st.config.placementHistory.countDocuments({})); }); - it("addShard generates initialization metadata when a first empty shard is added", () => { + it("addShard generates initialization metadata and a post-commit notification when a first empty shard is added", () => { const firstShardName = "firstShard"; const firstShardRS = this.spinNewReplicaSet(firstShardName); @@ -75,30 +100,35 @@ describe("Activity of the addShard commit within config.placementHistory", funct const firstShardCreationTime = this.getTopologyTimeOf(firstShardName); this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [firstShardName]); + this.verifyPostCommitNotificationOnAddedReplicaSet(firstShardRS, firstShardCreationTime); }); - it("transitionFromDedicatedConfigServer generates initialization metadata", () => { + it("transitionFromDedicatedConfigServer generates initialization metadata and a post-commit notification", () => { const configShardName = "config"; assert.commandWorked(this.st.s.adminCommand({transitionFromDedicatedConfigServer: 1})); const firstShardCreationTime = this.getTopologyTimeOf(configShardName); this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [configShardName]); + this.verifyPostCommitNotificationOnAddedReplicaSet(this.st.configRS, firstShardCreationTime); }); - it("addShard preserves the original initialization metadata when a second shard is added", () => { + it("the addition of a second shard of the cluster do not generate any initialization metadata or post-commit notification", () => { const firstShardName = "firstShard"; const firstShardRS = this.spinNewReplicaSet(firstShardName); assert.commandWorked(this.st.s.adminCommand({addShard: firstShardRS.getURL(), name: firstShardName})); const firstShardCreationTime = this.getTopologyTimeOf(firstShardName); + this.verifyPostCommitNotificationOnAddedReplicaSet(firstShardRS, firstShardCreationTime); + const secondShardName = "secondShard"; const secondShardRS = this.spinNewReplicaSet(secondShardName); assert.commandWorked(this.st.s.adminCommand({addShard: secondShardRS.getURL(), name: secondShardName})); this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [firstShardName]); + this.verifyPostCommitNotificationOnAddedReplicaSet(secondShardRS, null); }); - it("addShard generates initialization metadata and placement documents when a first not-empty shard is added", () => { + it("addShard generates initialization metadata, placement documents and a post-commit notification when a first not-empty shard is added", () => { const firstShardName = "firstShard"; const firstShardRS = this.spinNewReplicaSet(firstShardName); @@ -123,5 +153,6 @@ describe("Activity of the addShard commit within config.placementHistory", funct } this.verifyPlacementHistoryInitMetadata(firstShardCreationTime, [firstShardName]); + this.verifyPostCommitNotificationOnAddedReplicaSet(firstShardRS, firstShardCreationTime); }); }); diff --git a/jstests/sharding/query/change_streams/projection_fakes_internal_event.js b/jstests/sharding/query/change_streams/projection_fakes_internal_event.js index 5b33fd04761..830701bf7b1 100644 --- a/jstests/sharding/query/change_streams/projection_fakes_internal_event.js +++ b/jstests/sharding/query/change_streams/projection_fakes_internal_event.js @@ -73,11 +73,6 @@ describe("$changeStream", function () { assert.commandWorked(testColl.insert(invalidShardDoc)); assert.commandWorked(testColl.insert(fakeShardDoc)); - // TODO: SERVER-113286 Generate NamespacePlacementChanged on addition of first shard in the sharded cluster. - if (FeatureFlagUtil.isPresentAndEnabled(testDB, "ChangeStreamPreciseShardTargeting")) { - assert.commandWorked(testDB.adminCommand({resetPlacementHistory: 1})); - } - // Log the shard description documents that we just inserted into the collection. jsTest.log.info("Shard docs: ", {docs: testColl.find().toArray()}); }); diff --git a/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.cpp b/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.cpp index 1e12e19bd85..6eab8345ae6 100644 --- a/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.cpp +++ b/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.cpp @@ -102,6 +102,7 @@ void unblockConflictingDDLs(OperationContext* opCtx) { void broadcastPlacementHistoryChangedNotification( OperationContext* opCtx, + const Timestamp& committedAt, const OperationSessionInfo& osi, std::shared_ptr executor, const CancellationToken& token) { @@ -109,7 +110,7 @@ void broadcastPlacementHistoryChangedNotification( ShardsvrNotifyShardingEventRequest request( notify_sharding_event::kPlacementHistoryMetadataChanged, - PlacementHistoryMetadataChanged().toBSON()); + PlacementHistoryMetadataChanged(committedAt).toBSON()); request.setDbName(DatabaseName::kAdmin); generic_argument_util::setMajorityWriteConcern(request); @@ -255,7 +256,8 @@ ExecutorFuture InitializePlacementHistoryCoordinator::_runImpl( Phase::kFinalize, [this, token, executor = executor, anchor = shared_from_this()](auto* opCtx) { const auto osi = getNewSession(opCtx); - broadcastPlacementHistoryChangedNotification(opCtx, osi, executor, token); + broadcastPlacementHistoryChangedNotification( + opCtx, _doc.getInitializationTime().value(), osi, executor, token); PlacementHistoryCleaner::get(opCtx)->resume(opCtx); ShardingLogging::get(opCtx)->logChange(opCtx, "resetPlacementHistory.end", nss()); })) diff --git a/src/mongo/db/global_catalog/ddl/notify_sharding_event.idl b/src/mongo/db/global_catalog/ddl/notify_sharding_event.idl index 74c8483d860..5ae035e437a 100644 --- a/src/mongo/db/global_catalog/ddl/notify_sharding_event.idl +++ b/src/mongo/db/global_catalog/ddl/notify_sharding_event.idl @@ -114,14 +114,22 @@ structs: "Notification of the commit of a metadata change affecting the whole content of config.placementHistory. Shards are expected to react by emitting a no-op log entry directed to change stream readers." strict: false + fields: + committedAt: + description: + "The cluster time associated to the metadata change, + as persistent in the related config.placementHistory documents." + type: timestamp commands: _shardsvrNotifyShardingEvent: command_name: _shardsvrNotifyShardingEvent cpp_name: ShardsvrNotifyShardingEventRequest description: - "Internal command to be invoked by the config server to notify a shard - of an event concerning the shard itself or the whole cluster." + "Internal command supporting the integration between Sharding DDL operations and change stream readers. + A remote coordinator node may use it to dispatch the notification of an upcoming/committed event to one or more of its participants. + The recipient is expected to react with the generation of a no-op entry describing the details of the notification, + which will be later interpreted as a change/control event by change stream readers targeting it." namespace: ignored api_version: "" strict: false diff --git a/src/mongo/db/global_catalog/ddl/shardsvr_notify_sharding_event_command.cpp b/src/mongo/db/global_catalog/ddl/shardsvr_notify_sharding_event_command.cpp index f985cbf1495..1e45b8e0737 100644 --- a/src/mongo/db/global_catalog/ddl/shardsvr_notify_sharding_event_command.cpp +++ b/src/mongo/db/global_catalog/ddl/shardsvr_notify_sharding_event_command.cpp @@ -49,10 +49,6 @@ namespace mongo { -/** - * This command notifies an event on the shard server. The action taken is determined by the - * event ShardsvrAddShard: Add an oplog entry for the new shard. - */ class ShardsvrNotifyShardingEventCommand : public TypedCommand { public: using Request = ShardsvrNotifyShardingEventRequest; @@ -122,7 +118,9 @@ public: if (request().getEventType() == notify_sharding_event::kPlacementHistoryMetadataChanged) { - notifyChangeStreamsOnPlacementHistoryMetadataChanged(opCtx); + const auto event = PlacementHistoryMetadataChanged::parse( + request().getDetails(), IDLParserContext("_shardsvrNotifyShardingEvent")); + notifyChangeStreamsOnPlacementHistoryMetadataChanged(opCtx, event); return; } diff --git a/src/mongo/db/repl/change_stream_oplog_notification.cpp b/src/mongo/db/repl/change_stream_oplog_notification.cpp index 6f8f2625a63..7319e21c010 100644 --- a/src/mongo/db/repl/change_stream_oplog_notification.cpp +++ b/src/mongo/db/repl/change_stream_oplog_notification.cpp @@ -254,14 +254,15 @@ void notifyChangeStreamsOnNamespacePlacementChanged(OperationContext* opCtx, "NamespacePlacementChangedWritesOplog"); } -void notifyChangeStreamsOnPlacementHistoryMetadataChanged(OperationContext* opCtx) { - Timestamp now(opCtx->fastClockSource().now()); +void notifyChangeStreamsOnPlacementHistoryMetadataChanged( + OperationContext* opCtx, const PlacementHistoryMetadataChanged& notification) { // Global changes to the metadata of placementHistory are encoded as a NamespacePlacementChanged // notification with an unspecified namespace. - NamespacePlacementChanged globalChangeNotification(NamespaceString::kEmpty, now); + NamespacePlacementChanged repackagedNotification(NamespaceString::kEmpty, + notification.getCommittedAt()); insertNotificationOplogEntries( opCtx, - {buildNamespacePlacementChangedOplogEntry(opCtx, globalChangeNotification)}, + {buildNamespacePlacementChangedOplogEntry(opCtx, repackagedNotification)}, "PlacementHistoryMetadataChangedWritesOplog"); } diff --git a/src/mongo/db/repl/change_stream_oplog_notification.h b/src/mongo/db/repl/change_stream_oplog_notification.h index 6d56f7bf0f2..27836eac601 100644 --- a/src/mongo/db/repl/change_stream_oplog_notification.h +++ b/src/mongo/db/repl/change_stream_oplog_notification.h @@ -120,7 +120,8 @@ void notifyChangeStreamsOnNamespacePlacementChanged(OperationContext* opCtx, * Writes a no-op oplog entry concerning the commit of an operation * modifying the operational boundaries of config.placementHistory. */ -void notifyChangeStreamsOnPlacementHistoryMetadataChanged(OperationContext* opCtx); +void notifyChangeStreamsOnPlacementHistoryMetadataChanged( + OperationContext* opCtx, const PlacementHistoryMetadataChanged& notification); /** * Writes a no-op oplog entry on the end of multi shard transaction. diff --git a/src/mongo/db/topology/add_shard_coordinator.cpp b/src/mongo/db/topology/add_shard_coordinator.cpp index 61205083017..a1d85102d63 100644 --- a/src/mongo/db/topology/add_shard_coordinator.cpp +++ b/src/mongo/db/topology/add_shard_coordinator.cpp @@ -317,6 +317,24 @@ ExecutorFuture AddShardCoordinator::_runImpl( } } + // V2 Change stream readers consuming events from a promoted replica set that + // predate this commit need to receive a notification about the presence of new + // metadata in config.placementHistory to execute proper retargeting. + if (generatePlacementHistoryInitMetadata) { + auto& targeter = _getTargeter(opCtx); + ShardsvrNotifyShardingEventRequest request( + notify_sharding_event::kPlacementHistoryMetadataChanged, + PlacementHistoryMetadataChanged(newTopologyTime.asTimestamp()).toBSON()); + request.setDbName(DatabaseName::kAdmin); + const auto session = getNewSession(opCtx); + generic_argument_util::setMajorityWriteConcern(request); + generic_argument_util::setOperationSessionInfo(request, session); + uassertStatusOK( + topology_change_helpers::runCommandForAddShard( + opCtx, targeter, DatabaseName::kAdmin, request.toBSON(), **executor) + .commandStatus); + } + if (feature_flags::gShardAuthoritativeDbMetadataDDL.isEnabled( VersionContext::getDecoration(opCtx), serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {