SERVER-111797 Adjust the behavior of InitializePlacementHistoryCoordinator (#42739)

GitOrigin-RevId: 5faf6299d04f0d993dfc753c5b75f515cbb1b386
This commit is contained in:
ppolato 2025-10-21 10:01:51 +02:00 committed by MongoDB Bot
parent 1d7813ffed
commit abdc87419a
10 changed files with 202 additions and 8 deletions

View File

@ -49,6 +49,6 @@ suites:
no_passthrough: no_passthrough:
- jstests/noPassthrough/ddl/set_allow_migrations.js - jstests/noPassthrough/ddl/set_allow_migrations.js
- jstests/noPassthrough/global_catalog/repair_sharded_collection_history.js - jstests/noPassthrough/global_catalog/repair_sharded_collection_history.js
- jstests/noPassthrough/global_catalog/reset_placement_history_snapshot_read.js - jstests/noPassthrough/global_catalog/reset_placement_history.js
- jstests/noPassthrough/versioning_protocol/_flush_cache_update_commands.js - jstests/noPassthrough/versioning_protocol/_flush_cache_update_commands.js
- jstests/noPassthrough/versioning_protocol/flush_router_config.js - jstests/noPassthrough/versioning_protocol/flush_router_config.js

View File

@ -1,5 +1,5 @@
/** /**
* Test validating the expected behavior of resetPlacementHistory (in particular, its logic performing snapshot reads of the global catalog). * Test validating the expected behavior of resetPlacementHistory.
* @tags: [ * @tags: [
* featureFlagChangeStreamPreciseShardTargeting, * featureFlagChangeStreamPreciseShardTargeting,
* ] * ]
@ -42,6 +42,96 @@ function launchAndPauseResetPlacementHistory() {
}; };
} }
{
jsTest.log.info("resetPlacementHistory produces the expected oplog entries across the shards of the cluster");
const initializationMetadataBeforeReset = st.config.placementHistory
.find({nss: initializationMetadataNssId})
.sort({timestamp: 1})
.toArray();
assert.eq(initializationMetadataBeforeReset.length, 2);
assert(
timestampCmp(initializationMetadataBeforeReset[0].timestamp, initializationMetadataBeforeReset[1].timestamp) !==
0,
);
const initializationTimeBeforeReset = initializationMetadataBeforeReset[1].timestamp;
assert.commandWorked(st.s.adminCommand({resetPlacementHistory: 1}));
const initializationMetadataAfterReset = st.config.placementHistory
.find({nss: initializationMetadataNssId})
.sort({timestamp: 1})
.toArray();
assert.eq(initializationMetadataAfterReset.length, 2);
assert(
timestampCmp(initializationMetadataAfterReset[0].timestamp, initializationMetadataAfterReset[1].timestamp) !==
0,
);
const initializationTimeAfterReset = initializationMetadataAfterReset[1].timestamp;
assert(timestampCmp(initializationTimeAfterReset, initializationTimeBeforeReset) > 0);
[st.rs0, st.rs1].forEach((rs) => {
const primary = rs.getPrimary();
const placementHistoryChangedNotifications = primary
.getCollection("local.oplog.rs")
.find({"ns": "", "o2.namespacePlacementChanged": 1})
.toArray();
assert.eq(placementHistoryChangedNotifications.length, 1);
const entry = placementHistoryChangedNotifications[0];
assert.eq(entry.op, "n");
assert(timestampCmp(entry.ts, initializationTimeAfterReset) > 0);
});
}
{
jsTest.log.info("resetPlacementHistory produces the expected oplog entries across the shards of the cluster");
const initializationMetadataBeforeReset = st.config.placementHistory
.find({nss: initializationMetadataNssId})
.sort({timestamp: 1})
.toArray();
assert.eq(initializationMetadataBeforeReset.length, 2);
assert(
timestampCmp(initializationMetadataBeforeReset[0].timestamp, initializationMetadataBeforeReset[1].timestamp) !==
0,
);
const initializationTimeBeforeReset = initializationMetadataBeforeReset[1].timestamp;
assert.commandWorked(st.s.adminCommand({resetPlacementHistory: 1}));
const initializationMetadataAfterReset = st.config.placementHistory
.find({nss: initializationMetadataNssId})
.sort({timestamp: 1})
.toArray();
assert.eq(initializationMetadataAfterReset.length, 2);
assert(
timestampCmp(initializationMetadataAfterReset[0].timestamp, initializationMetadataAfterReset[1].timestamp) !==
0,
);
const initializationTimeAfterReset = initializationMetadataAfterReset[1].timestamp;
assert(timestampCmp(initializationTimeAfterReset, initializationTimeBeforeReset) > 0);
[st.rs0, st.rs1].forEach((rs) => {
const primary = rs.getPrimary();
const placementHistoryChangedNotifications = primary
.getCollection("local.oplog.rs")
.find({"ns": "", "o2.namespacePlacementChanged": 1})
.sort({ts: -1})
.toArray();
const entry = placementHistoryChangedNotifications[0];
assert.eq(entry.op, "n");
assert.eq(entry.o, {msg: {namespacePlacementChanged: ""}});
assert(timestampCmp(entry.ts, initializationTimeAfterReset) > 0);
});
}
{ {
jsTest.log.info( jsTest.log.info(
"resetPlacementHistory produces a materialized view that is consistent with the state of the global catalog at the chosen point-in-time", "resetPlacementHistory produces a materialized view that is consistent with the state of the global catalog at the chosen point-in-time",

View File

@ -29,11 +29,13 @@
#include "mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.h" #include "mongo/db/global_catalog/ddl/initialize_placement_history_coordinator.h"
#include "mongo/db/global_catalog/ddl/notify_sharding_event_gen.h"
#include "mongo/db/global_catalog/ddl/placement_history_cleaner.h" #include "mongo/db/global_catalog/ddl/placement_history_cleaner.h"
#include "mongo/db/global_catalog/ddl/sharding_catalog_manager.h" #include "mongo/db/global_catalog/ddl/sharding_catalog_manager.h"
#include "mongo/db/global_catalog/ddl/sharding_util.h" #include "mongo/db/global_catalog/ddl/sharding_util.h"
#include "mongo/db/global_catalog/ddl/shardsvr_join_ddl_coordinators_request_gen.h" #include "mongo/db/global_catalog/ddl/shardsvr_join_ddl_coordinators_request_gen.h"
#include "mongo/db/local_catalog/drop_collection.h" #include "mongo/db/local_catalog/drop_collection.h"
#include "mongo/db/sharding_environment/sharding_logging.h"
#include "mongo/db/vector_clock/vector_clock.h" #include "mongo/db/vector_clock/vector_clock.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
@ -97,10 +99,54 @@ void blockAndDrainConflictingDDLs(OperationContext* opCtx) {
void unblockConflictingDDLs(OperationContext* opCtx) { void unblockConflictingDDLs(OperationContext* opCtx) {
broadcastHistoryInitializationState(opCtx, false /*isInProgress*/); broadcastHistoryInitializationState(opCtx, false /*isInProgress*/);
} }
void broadcastPlacementHistoryChangedNotification(
OperationContext* opCtx,
const OperationSessionInfo& osi,
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) {
auto allShards = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
ShardsvrNotifyShardingEventRequest request(
notify_sharding_event::kPlacementHistoryMetadataChanged,
PlacementHistoryMetadataChanged().toBSON());
request.setDbName(DatabaseName::kAdmin);
generic_argument_util::setMajorityWriteConcern(request);
generic_argument_util::setOperationSessionInfo(request, osi);
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<ShardsvrNotifyShardingEventRequest>>(
**executor, token, std::move(request));
auto responses = sharding_ddl_util::sendAuthenticatedCommandToShards(
opCtx, opts, std::move(allShards), false /* throwOnError */);
for (const auto& response : responses) {
auto status = AsyncRequestsSender::Response::getEffectiveStatus(response);
if (status.isOK()) {
continue;
}
if (status == ErrorCodes::UnsupportedShardingEventNotification) {
// Swallow the error, which is expected when the recipient runs a legacy binary that
// does not support the kPlacementHistoryMetadataChanged notification type.
LOGV2_WARNING(10916800,
"Skipping kPlacementHistoryMetadataChanged notification",
"recipient"_attr = response.shardId);
} else {
uassertStatusOK(status);
}
}
}
bool anyShardInTheCluster(OperationContext* opCtx) {
auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
return shardRegistry->getNumShards(opCtx) != 0;
}
} // namespace } // namespace
bool InitializePlacementHistoryCoordinator::_mustAlwaysMakeProgress() { bool InitializePlacementHistoryCoordinator::_mustAlwaysMakeProgress() {
return _doc.getPhase() > Phase::kUnset; return _doc.getPhase() > Phase::kCheckPreconditions;
} }
std::set<NamespaceString> InitializePlacementHistoryCoordinator::_getAdditionalLocksToAcquire( std::set<NamespaceString> InitializePlacementHistoryCoordinator::_getAdditionalLocksToAcquire(
@ -114,9 +160,22 @@ ExecutorFuture<void> InitializePlacementHistoryCoordinator::_runImpl(
std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept { const CancellationToken& token) noexcept {
return ExecutorFuture<void>(**executor) return ExecutorFuture<void>(**executor)
.then(_buildPhaseHandler(Phase::kCheckPreconditions,
[](auto* opCtx) {
// Disregard this request if the cluster is 'empty':
// config.placementHistory has no meaning within such a state
// and the first shard addition is expected to later trigger
// the initialization of its content.
uassert(
ErrorCodes::RequestAlreadyFulfilled,
"Skipping initialization of config.placementHistory: "
"there is currently no shard registered in the cluster",
anyShardInTheCluster(opCtx));
}))
.then([this, anchor = shared_from_this()] { .then([this, anchor = shared_from_this()] {
auto opCtxHolder = makeOperationContext(); auto opCtxHolder = makeOperationContext();
auto* opCtx = opCtxHolder.get(); auto* opCtx = opCtxHolder.get();
ShardingLogging::get(opCtx)->logChange(opCtx, "resetPlacementHistory.start", nss());
// Ensure that there is no concurrent access from the periodic cleaning job (which may // Ensure that there is no concurrent access from the periodic cleaning job (which may
// have been re-activated during the execution of this Coordinator during a node step // have been re-activated during the execution of this Coordinator during a node step
// up). // up).
@ -192,9 +251,22 @@ ExecutorFuture<void> InitializePlacementHistoryCoordinator::_runImpl(
"config.placementHistory initialization failed"); "config.placementHistory initialization failed");
} }
})) }))
.then(_buildPhaseHandler(Phase::kFinalize, [](auto* opCtx) { .then(_buildPhaseHandler(
PlacementHistoryCleaner::get(opCtx)->resume(opCtx); Phase::kFinalize,
})); [this, token, executor = executor, anchor = shared_from_this()](auto* opCtx) {
const auto osi = getNewSession(opCtx);
broadcastPlacementHistoryChangedNotification(opCtx, osi, executor, token);
PlacementHistoryCleaner::get(opCtx)->resume(opCtx);
ShardingLogging::get(opCtx)->logChange(opCtx, "resetPlacementHistory.end", nss());
}))
.onError([](const Status& status) {
if (status == ErrorCodes::RequestAlreadyFulfilled) {
LOGV2(10916801, "Skipping initialization of config.placementHistory");
return Status::OK();
};
return status;
});
} }
} // namespace mongo } // namespace mongo

View File

@ -43,6 +43,7 @@ enums:
type: string type: string
values: values:
kUnset: "unset" kUnset: "unset"
kCheckPreconditions: "checkPreconditions"
kBlockDDLs: "blockDDLs" kBlockDDLs: "blockDDLs"
kDefineInitializationTime: "defineInitializationTime" kDefineInitializationTime: "defineInitializationTime"
kUnblockDDLs: "unblockDDLs" kUnblockDDLs: "unblockDDLs"

View File

@ -109,6 +109,12 @@ structs:
description: "The cluster time at which the placement-changing DDL got recorded into the global catalog." description: "The cluster time at which the placement-changing DDL got recorded into the global catalog."
type: timestamp type: timestamp
PlacementHistoryMetadataChanged:
description:
"Notification of the commit of a metadata change affecting the whole content of config.placementHistory.
Shards are expected to react by emitting a no-op log entry directed to change stream readers."
strict: false
commands: commands:
_shardsvrNotifyShardingEvent: _shardsvrNotifyShardingEvent:
command_name: _shardsvrNotifyShardingEvent command_name: _shardsvrNotifyShardingEvent

View File

@ -42,10 +42,12 @@ static constexpr char kDatabasesAdded[] = "databasesAdded";
static constexpr char kCollectionSharded[] = "collectionSharded"; static constexpr char kCollectionSharded[] = "collectionSharded";
static constexpr char kCollectionResharded[] = "collectionResharded"; static constexpr char kCollectionResharded[] = "collectionResharded";
static constexpr char kNamespacePlacementChanged[] = "namespacePlacementChanged"; static constexpr char kNamespacePlacementChanged[] = "namespacePlacementChanged";
static constexpr char kPlacementHistoryMetadataChanged[] = "placementHistoryMetadataChanged";
inline Status validateEventType(const std::string& eventType) { inline Status validateEventType(const std::string& eventType) {
if (eventType == kCollectionResharded || eventType == kNamespacePlacementChanged || if (eventType == kCollectionResharded || eventType == kNamespacePlacementChanged ||
eventType == kDatabasesAdded || eventType == kCollectionSharded) { eventType == kDatabasesAdded || eventType == kCollectionSharded ||
eventType == kPlacementHistoryMetadataChanged) {
return Status::OK(); return Status::OK();
} }

View File

@ -120,6 +120,12 @@ public:
return; return;
} }
if (request().getEventType() ==
notify_sharding_event::kPlacementHistoryMetadataChanged) {
notifyChangeStreamsOnPlacementHistoryMetadataChanged(opCtx);
return;
}
MONGO_UNREACHABLE_TASSERT(10083526); MONGO_UNREACHABLE_TASSERT(10083526);
} }

View File

@ -1014,12 +1014,12 @@ mongo_cc_library(
], ],
hdrs = [ hdrs = [
"change_stream_oplog_notification.h", "change_stream_oplog_notification.h",
"//src/mongo/db/global_catalog/ddl:notify_sharding_event_gen",
], ],
deps = [ deps = [
":oplog", ":oplog",
":oplog_entry", ":oplog_entry",
"//src/mongo/db:dbhelpers", "//src/mongo/db:dbhelpers",
"//src/mongo/db/global_catalog/ddl:notify_sharding_event_idl",
"//src/mongo/db/local_catalog/lock_manager", "//src/mongo/db/local_catalog/lock_manager",
"//src/mongo/db/local_catalog/lock_manager:exception_util", "//src/mongo/db/local_catalog/lock_manager:exception_util",
"//src/mongo/db/pipeline:change_stream_helpers", "//src/mongo/db/pipeline:change_stream_helpers",

View File

@ -254,6 +254,17 @@ void notifyChangeStreamsOnNamespacePlacementChanged(OperationContext* opCtx,
"NamespacePlacementChangedWritesOplog"); "NamespacePlacementChangedWritesOplog");
} }
void notifyChangeStreamsOnPlacementHistoryMetadataChanged(OperationContext* opCtx) {
Timestamp now(opCtx->fastClockSource().now());
// Global changes to the metadata of placementHistory are encoded as a NamespacePlacementChanged
// notification with an unspecified namespace.
NamespacePlacementChanged globalChangeNotification(NamespaceString::kEmpty, now);
insertNotificationOplogEntries(
opCtx,
{buildNamespacePlacementChangedOplogEntry(opCtx, globalChangeNotification)},
"PlacementHistoryMetadataChangedWritesOplog");
}
void notifyChangeStreamOnEndOfTransaction(OperationContext* opCtx, void notifyChangeStreamOnEndOfTransaction(OperationContext* opCtx,
const LogicalSessionId& lsid, const LogicalSessionId& lsid,
const TxnNumber& txnNumber, const TxnNumber& txnNumber,

View File

@ -116,6 +116,12 @@ repl::MutableOplogEntry buildNamespacePlacementChangedOplogEntry(
void notifyChangeStreamsOnNamespacePlacementChanged(OperationContext* opCtx, void notifyChangeStreamsOnNamespacePlacementChanged(OperationContext* opCtx,
const NamespacePlacementChanged& notification); const NamespacePlacementChanged& notification);
/**
* Writes a no-op oplog entry concerning the commit of an operation
* modifying the operational boundaries of config.placementHistory.
*/
void notifyChangeStreamsOnPlacementHistoryMetadataChanged(OperationContext* opCtx);
/** /**
* Writes a no-op oplog entry on the end of multi shard transaction. * Writes a no-op oplog entry on the end of multi shard transaction.
**/ **/