From abdc87419afbadf02e99ad1a811d9f1889d9f135 Mon Sep 17 00:00:00 2001 From: ppolato <82828243+ppolato@users.noreply.github.com> Date: Tue, 21 Oct 2025 10:01:51 +0200 Subject: [PATCH] SERVER-111797 Adjust the behavior of InitializePlacementHistoryCoordinator (#42739) GitOrigin-RevId: 5faf6299d04f0d993dfc753c5b75f515cbb1b386 --- .../smoke_tests/catalog_and_routing.yml | 2 +- ...hot_read.js => reset_placement_history.js} | 92 ++++++++++++++++++- ...itialize_placement_history_coordinator.cpp | 80 +++++++++++++++- ...placement_history_coordinator_document.idl | 1 + .../ddl/notify_sharding_event.idl | 6 ++ .../ddl/notify_sharding_event_utils.h | 4 +- ...shardsvr_notify_sharding_event_command.cpp | 6 ++ src/mongo/db/repl/BUILD.bazel | 2 +- .../repl/change_stream_oplog_notification.cpp | 11 +++ .../repl/change_stream_oplog_notification.h | 6 ++ 10 files changed, 202 insertions(+), 8 deletions(-) rename jstests/noPassthrough/global_catalog/{reset_placement_history_snapshot_read.js => reset_placement_history.js} (70%) diff --git a/buildscripts/smoke_tests/catalog_and_routing.yml b/buildscripts/smoke_tests/catalog_and_routing.yml index 9285f69d7ea..ee27ba164e8 100644 --- a/buildscripts/smoke_tests/catalog_and_routing.yml +++ b/buildscripts/smoke_tests/catalog_and_routing.yml @@ -49,6 +49,6 @@ suites: no_passthrough: - jstests/noPassthrough/ddl/set_allow_migrations.js - jstests/noPassthrough/global_catalog/repair_sharded_collection_history.js - - jstests/noPassthrough/global_catalog/reset_placement_history_snapshot_read.js + - jstests/noPassthrough/global_catalog/reset_placement_history.js - jstests/noPassthrough/versioning_protocol/_flush_cache_update_commands.js - jstests/noPassthrough/versioning_protocol/flush_router_config.js diff --git a/jstests/noPassthrough/global_catalog/reset_placement_history_snapshot_read.js b/jstests/noPassthrough/global_catalog/reset_placement_history.js similarity index 70% rename from jstests/noPassthrough/global_catalog/reset_placement_history_snapshot_read.js rename to jstests/noPassthrough/global_catalog/reset_placement_history.js index a85601f1394..77ea93f1f1b 100644 --- a/jstests/noPassthrough/global_catalog/reset_placement_history_snapshot_read.js +++ b/jstests/noPassthrough/global_catalog/reset_placement_history.js @@ -1,5 +1,5 @@ /** - * Test validating the expected behavior of resetPlacementHistory (in particular, its logic performing snapshot reads of the global catalog). + * Test validating the expected behavior of resetPlacementHistory. * @tags: [ * featureFlagChangeStreamPreciseShardTargeting, * ] @@ -42,6 +42,96 @@ function launchAndPauseResetPlacementHistory() { }; } +{ + jsTest.log.info("resetPlacementHistory produces the expected oplog entries across the shards of the cluster"); + + const initializationMetadataBeforeReset = st.config.placementHistory + .find({nss: initializationMetadataNssId}) + .sort({timestamp: 1}) + .toArray(); + + assert.eq(initializationMetadataBeforeReset.length, 2); + assert( + timestampCmp(initializationMetadataBeforeReset[0].timestamp, initializationMetadataBeforeReset[1].timestamp) !== + 0, + ); + const initializationTimeBeforeReset = initializationMetadataBeforeReset[1].timestamp; + + assert.commandWorked(st.s.adminCommand({resetPlacementHistory: 1})); + + const initializationMetadataAfterReset = st.config.placementHistory + .find({nss: initializationMetadataNssId}) + .sort({timestamp: 1}) + .toArray(); + + assert.eq(initializationMetadataAfterReset.length, 2); + assert( + timestampCmp(initializationMetadataAfterReset[0].timestamp, initializationMetadataAfterReset[1].timestamp) !== + 0, + ); + const initializationTimeAfterReset = initializationMetadataAfterReset[1].timestamp; + + assert(timestampCmp(initializationTimeAfterReset, initializationTimeBeforeReset) > 0); + + [st.rs0, st.rs1].forEach((rs) => { + const primary = rs.getPrimary(); + const placementHistoryChangedNotifications = primary + .getCollection("local.oplog.rs") + .find({"ns": "", "o2.namespacePlacementChanged": 1}) + .toArray(); + assert.eq(placementHistoryChangedNotifications.length, 1); + const entry = placementHistoryChangedNotifications[0]; + assert.eq(entry.op, "n"); + assert(timestampCmp(entry.ts, initializationTimeAfterReset) > 0); + }); +} + +{ + jsTest.log.info("resetPlacementHistory produces the expected oplog entries across the shards of the cluster"); + + const initializationMetadataBeforeReset = st.config.placementHistory + .find({nss: initializationMetadataNssId}) + .sort({timestamp: 1}) + .toArray(); + + assert.eq(initializationMetadataBeforeReset.length, 2); + assert( + timestampCmp(initializationMetadataBeforeReset[0].timestamp, initializationMetadataBeforeReset[1].timestamp) !== + 0, + ); + const initializationTimeBeforeReset = initializationMetadataBeforeReset[1].timestamp; + + assert.commandWorked(st.s.adminCommand({resetPlacementHistory: 1})); + + const initializationMetadataAfterReset = st.config.placementHistory + .find({nss: initializationMetadataNssId}) + .sort({timestamp: 1}) + .toArray(); + + assert.eq(initializationMetadataAfterReset.length, 2); + assert( + timestampCmp(initializationMetadataAfterReset[0].timestamp, initializationMetadataAfterReset[1].timestamp) !== + 0, + ); + const initializationTimeAfterReset = initializationMetadataAfterReset[1].timestamp; + + assert(timestampCmp(initializationTimeAfterReset, initializationTimeBeforeReset) > 0); + + [st.rs0, st.rs1].forEach((rs) => { + const primary = rs.getPrimary(); + const placementHistoryChangedNotifications = primary + .getCollection("local.oplog.rs") + .find({"ns": "", "o2.namespacePlacementChanged": 1}) + .sort({ts: -1}) + .toArray(); + + const entry = placementHistoryChangedNotifications[0]; + assert.eq(entry.op, "n"); + assert.eq(entry.o, {msg: {namespacePlacementChanged: ""}}); + assert(timestampCmp(entry.ts, initializationTimeAfterReset) > 0); + }); +} + { jsTest.log.info( "resetPlacementHistory produces a materialized view that is consistent with the state of the global catalog at the chosen point-in-time", diff --git a/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.cpp b/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.cpp index ed80b124a58..f12b67861aa 100644 --- a/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.cpp +++ b/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.cpp @@ -29,11 +29,13 @@ #include "mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.h" +#include "mongo/db/global_catalog/ddl/notify_sharding_event_gen.h" #include "mongo/db/global_catalog/ddl/placement_history_cleaner.h" #include "mongo/db/global_catalog/ddl/sharding_catalog_manager.h" #include "mongo/db/global_catalog/ddl/sharding_util.h" #include "mongo/db/global_catalog/ddl/shardsvr_join_ddl_coordinators_request_gen.h" #include "mongo/db/local_catalog/drop_collection.h" +#include "mongo/db/sharding_environment/sharding_logging.h" #include "mongo/db/vector_clock/vector_clock.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding @@ -97,10 +99,54 @@ void blockAndDrainConflictingDDLs(OperationContext* opCtx) { void unblockConflictingDDLs(OperationContext* opCtx) { broadcastHistoryInitializationState(opCtx, false /*isInProgress*/); } + +void broadcastPlacementHistoryChangedNotification( + OperationContext* opCtx, + const OperationSessionInfo& osi, + std::shared_ptr executor, + const CancellationToken& token) { + auto allShards = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); + + ShardsvrNotifyShardingEventRequest request( + notify_sharding_event::kPlacementHistoryMetadataChanged, + PlacementHistoryMetadataChanged().toBSON()); + + request.setDbName(DatabaseName::kAdmin); + generic_argument_util::setMajorityWriteConcern(request); + generic_argument_util::setOperationSessionInfo(request, osi); + + auto opts = std::make_shared>( + **executor, token, std::move(request)); + + auto responses = sharding_ddl_util::sendAuthenticatedCommandToShards( + opCtx, opts, std::move(allShards), false /* throwOnError */); + for (const auto& response : responses) { + auto status = AsyncRequestsSender::Response::getEffectiveStatus(response); + if (status.isOK()) { + continue; + } + + if (status == ErrorCodes::UnsupportedShardingEventNotification) { + // Swallow the error, which is expected when the recipient runs a legacy binary that + // does not support the kPlacementHistoryMetadataChanged notification type. + LOGV2_WARNING(10916800, + "Skipping kPlacementHistoryMetadataChanged notification", + "recipient"_attr = response.shardId); + } else { + uassertStatusOK(status); + } + } +} + +bool anyShardInTheCluster(OperationContext* opCtx) { + auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); + return shardRegistry->getNumShards(opCtx) != 0; +} + } // namespace bool InitializePlacementHistoryCoordinator::_mustAlwaysMakeProgress() { - return _doc.getPhase() > Phase::kUnset; + return _doc.getPhase() > Phase::kCheckPreconditions; } std::set InitializePlacementHistoryCoordinator::_getAdditionalLocksToAcquire( @@ -114,9 +160,22 @@ ExecutorFuture InitializePlacementHistoryCoordinator::_runImpl( std::shared_ptr executor, const CancellationToken& token) noexcept { return ExecutorFuture(**executor) + .then(_buildPhaseHandler(Phase::kCheckPreconditions, + [](auto* opCtx) { + // Disregard this request if the cluster is 'empty': + // config.placementHistory has no meaning within such a state + // and the first shard addition is expected to later trigger + // the initialization of its content. + uassert( + ErrorCodes::RequestAlreadyFulfilled, + "Skipping initialization of config.placementHistory: " + "there is currently no shard registered in the cluster", + anyShardInTheCluster(opCtx)); + })) .then([this, anchor = shared_from_this()] { auto opCtxHolder = makeOperationContext(); auto* opCtx = opCtxHolder.get(); + ShardingLogging::get(opCtx)->logChange(opCtx, "resetPlacementHistory.start", nss()); // Ensure that there is no concurrent access from the periodic cleaning job (which may // have been re-activated during the execution of this Coordinator during a node step // up). @@ -192,9 +251,22 @@ ExecutorFuture InitializePlacementHistoryCoordinator::_runImpl( "config.placementHistory initialization failed"); } })) - .then(_buildPhaseHandler(Phase::kFinalize, [](auto* opCtx) { - PlacementHistoryCleaner::get(opCtx)->resume(opCtx); - })); + .then(_buildPhaseHandler( + Phase::kFinalize, + [this, token, executor = executor, anchor = shared_from_this()](auto* opCtx) { + const auto osi = getNewSession(opCtx); + broadcastPlacementHistoryChangedNotification(opCtx, osi, executor, token); + PlacementHistoryCleaner::get(opCtx)->resume(opCtx); + ShardingLogging::get(opCtx)->logChange(opCtx, "resetPlacementHistory.end", nss()); + })) + .onError([](const Status& status) { + if (status == ErrorCodes::RequestAlreadyFulfilled) { + LOGV2(10916801, "Skipping initialization of config.placementHistory"); + return Status::OK(); + }; + + return status; + }); } } // namespace mongo diff --git a/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator_document.idl b/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator_document.idl index 30698f7f105..133f0055648 100644 --- a/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator_document.idl +++ b/src/mongo/db/global_catalog/ddl/initialize_placement_history_coordinator_document.idl @@ -43,6 +43,7 @@ enums: type: string values: kUnset: "unset" + kCheckPreconditions: "checkPreconditions" kBlockDDLs: "blockDDLs" kDefineInitializationTime: "defineInitializationTime" kUnblockDDLs: "unblockDDLs" diff --git a/src/mongo/db/global_catalog/ddl/notify_sharding_event.idl b/src/mongo/db/global_catalog/ddl/notify_sharding_event.idl index 002541efece..74c8483d860 100644 --- a/src/mongo/db/global_catalog/ddl/notify_sharding_event.idl +++ b/src/mongo/db/global_catalog/ddl/notify_sharding_event.idl @@ -109,6 +109,12 @@ structs: description: "The cluster time at which the placement-changing DDL got recorded into the global catalog." type: timestamp + PlacementHistoryMetadataChanged: + description: + "Notification of the commit of a metadata change affecting the whole content of config.placementHistory. + Shards are expected to react by emitting a no-op log entry directed to change stream readers." + strict: false + commands: _shardsvrNotifyShardingEvent: command_name: _shardsvrNotifyShardingEvent diff --git a/src/mongo/db/global_catalog/ddl/notify_sharding_event_utils.h b/src/mongo/db/global_catalog/ddl/notify_sharding_event_utils.h index fa9f1355875..35a6ab16816 100644 --- a/src/mongo/db/global_catalog/ddl/notify_sharding_event_utils.h +++ b/src/mongo/db/global_catalog/ddl/notify_sharding_event_utils.h @@ -42,10 +42,12 @@ static constexpr char kDatabasesAdded[] = "databasesAdded"; static constexpr char kCollectionSharded[] = "collectionSharded"; static constexpr char kCollectionResharded[] = "collectionResharded"; static constexpr char kNamespacePlacementChanged[] = "namespacePlacementChanged"; +static constexpr char kPlacementHistoryMetadataChanged[] = "placementHistoryMetadataChanged"; inline Status validateEventType(const std::string& eventType) { if (eventType == kCollectionResharded || eventType == kNamespacePlacementChanged || - eventType == kDatabasesAdded || eventType == kCollectionSharded) { + eventType == kDatabasesAdded || eventType == kCollectionSharded || + eventType == kPlacementHistoryMetadataChanged) { return Status::OK(); } diff --git a/src/mongo/db/global_catalog/ddl/shardsvr_notify_sharding_event_command.cpp b/src/mongo/db/global_catalog/ddl/shardsvr_notify_sharding_event_command.cpp index 9d24b38ea72..f985cbf1495 100644 --- a/src/mongo/db/global_catalog/ddl/shardsvr_notify_sharding_event_command.cpp +++ b/src/mongo/db/global_catalog/ddl/shardsvr_notify_sharding_event_command.cpp @@ -120,6 +120,12 @@ public: return; } + if (request().getEventType() == + notify_sharding_event::kPlacementHistoryMetadataChanged) { + notifyChangeStreamsOnPlacementHistoryMetadataChanged(opCtx); + return; + } + MONGO_UNREACHABLE_TASSERT(10083526); } diff --git a/src/mongo/db/repl/BUILD.bazel b/src/mongo/db/repl/BUILD.bazel index e166f79ad69..3cca302ea7e 100644 --- a/src/mongo/db/repl/BUILD.bazel +++ b/src/mongo/db/repl/BUILD.bazel @@ -1014,12 +1014,12 @@ mongo_cc_library( ], hdrs = [ "change_stream_oplog_notification.h", - "//src/mongo/db/global_catalog/ddl:notify_sharding_event_gen", ], deps = [ ":oplog", ":oplog_entry", "//src/mongo/db:dbhelpers", + "//src/mongo/db/global_catalog/ddl:notify_sharding_event_idl", "//src/mongo/db/local_catalog/lock_manager", "//src/mongo/db/local_catalog/lock_manager:exception_util", "//src/mongo/db/pipeline:change_stream_helpers", diff --git a/src/mongo/db/repl/change_stream_oplog_notification.cpp b/src/mongo/db/repl/change_stream_oplog_notification.cpp index 5903b7efca4..f74311ebeba 100644 --- a/src/mongo/db/repl/change_stream_oplog_notification.cpp +++ b/src/mongo/db/repl/change_stream_oplog_notification.cpp @@ -254,6 +254,17 @@ void notifyChangeStreamsOnNamespacePlacementChanged(OperationContext* opCtx, "NamespacePlacementChangedWritesOplog"); } +void notifyChangeStreamsOnPlacementHistoryMetadataChanged(OperationContext* opCtx) { + Timestamp now(opCtx->fastClockSource().now()); + // Global changes to the metadata of placementHistory are encoded as a NamespacePlacementChanged + // notification with an unspecified namespace. + NamespacePlacementChanged globalChangeNotification(NamespaceString::kEmpty, now); + insertNotificationOplogEntries( + opCtx, + {buildNamespacePlacementChangedOplogEntry(opCtx, globalChangeNotification)}, + "PlacementHistoryMetadataChangedWritesOplog"); +} + void notifyChangeStreamOnEndOfTransaction(OperationContext* opCtx, const LogicalSessionId& lsid, const TxnNumber& txnNumber, diff --git a/src/mongo/db/repl/change_stream_oplog_notification.h b/src/mongo/db/repl/change_stream_oplog_notification.h index 76befa11904..6d56f7bf0f2 100644 --- a/src/mongo/db/repl/change_stream_oplog_notification.h +++ b/src/mongo/db/repl/change_stream_oplog_notification.h @@ -116,6 +116,12 @@ repl::MutableOplogEntry buildNamespacePlacementChangedOplogEntry( void notifyChangeStreamsOnNamespacePlacementChanged(OperationContext* opCtx, const NamespacePlacementChanged& notification); +/** + * Writes a no-op oplog entry concerning the commit of an operation + * modifying the operational boundaries of config.placementHistory. + */ +void notifyChangeStreamsOnPlacementHistoryMetadataChanged(OperationContext* opCtx); + /** * Writes a no-op oplog entry on the end of multi shard transaction. **/