SERVER-114267 Use scatterGatherVersionedTargetByRoutingTable in RefineCollectionShardKeyCoordinator (#44788)

GitOrigin-RevId: 5b611ec8406a4f6ec53b61dfeef7a5b716b9dcef
This commit is contained in:
Aitor Esteve Alvarado 2025-12-12 11:28:07 +01:00 committed by MongoDB Bot
parent beb2aa10a4
commit 012d77a858
6 changed files with 70 additions and 73 deletions

View File

@ -45,6 +45,7 @@
#include "mongo/db/global_catalog/ddl/sharding_util.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer/op_observer.h"
#include "mongo/db/router_role/cluster_commands_helpers.h"
#include "mongo/db/router_role/router_role.h"
#include "mongo/db/shard_role/lock_manager/exception_util.h"
#include "mongo/db/shard_role/lock_manager/lock_manager_defs.h"
@ -240,9 +241,8 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl(
_performNoopWriteOnDataShardsAndConfigServer(opCtx, nss(), session, **executor);
}
// Stop migrations before checking indexes considering any concurrent index
// creation/drop with migrations could leave the cluster with inconsistent indexes,
// PM-2077 should address that.
// Stop migrations during most of the execution of the coordinator to guarantee a
// stable placement.
{
const auto session = getNewSession(opCtx);
sharding_ddl_util::stopMigrations(
@ -250,7 +250,6 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl(
}
const auto& ns = nss();
auto const shardsWithData = getShardsWithDataForCollection(opCtx, ns);
// fetch the collection metadata and install it on each shard
if (feature_flags::gShardAuthoritativeCollMetadata.isEnabled(
@ -263,27 +262,45 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl(
const auto session = getNewSession(opCtx);
sharding_ddl_util::sendFetchCollMetadataToShards(
opCtx, ns, shardsWithData, session, executor, token);
opCtx,
ns,
getShardsWithDataForCollection(opCtx, ns),
session,
executor,
token);
}
sharding::router::CollectionRouter router(opCtx, ns);
router.route("validating indexes for refineCollectionShardKey"_sd,
[&](OperationContext* opCtx, const CollectionRoutingInfo& cri) {
ShardsvrValidateShardKeyCandidate validateRequest(ns);
validateRequest.setKey(_doc.getNewShardKey());
validateRequest.setEnforceUniquenessCheck(
_request.getEnforceUniquenessCheck());
validateRequest.setDbName(DatabaseName::kAdmin);
ShardsvrValidateShardKeyCandidate validateRequest(ns);
validateRequest.setKey(_doc.getNewShardKey());
validateRequest.setEnforceUniquenessCheck(_request.getEnforceUniquenessCheck());
validateRequest.setDbName(DatabaseName::kAdmin);
const auto bsonCmd = validateRequest.toBSON();
sharding_util::sendCommandToShardsWithVersion(
opCtx,
ns.dbName(),
validateRequest.toBSON(),
shardsWithData,
**executor,
cri,
true /* throwOnError */);
});
sharding::router::CollectionRouter router(opCtx, ns);
router.routeWithRoutingContext(
"validating indexes for refineCollectionShardKey"_sd,
[&](OperationContext* opCtx, RoutingContext& routingCtx) {
// TODO SERVER-115323: Consider using a (new) variant of
// sendAuthenticatedCommandToShards()
const auto responses = scatterGatherVersionedTargetByRoutingTable(
opCtx,
routingCtx,
ns,
bsonCmd,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
Shard::RetryPolicy::kIdempotent,
/*query=*/{},
/*collation=*/{},
/*letParameters=*/boost::none,
/*runtimeConstants=*/boost::none,
/*eligibleForSampling=*/false,
**executor);
for (const auto& response : responses) {
uassertStatusOK(
AsyncRequestsSender::Response::getEffectiveStatus(response));
}
});
}))
.then(_buildPhaseHandler(
Phase::kBlockCrud,

View File

@ -168,21 +168,6 @@ std::vector<AsyncRequestsSender::Response> sendCommandToShards(
return processShardResponses(opCtx, dbName, command, requests, executor, throwOnError);
}
std::vector<AsyncRequestsSender::Response> sendCommandToShardsWithVersion(
OperationContext* opCtx,
const DatabaseName& dbName,
const BSONObj& command,
const std::vector<ShardId>& shardIds,
const std::shared_ptr<executor::TaskExecutor>& executor,
const CollectionRoutingInfo& cri,
const bool throwOnError) {
std::vector<AsyncRequestsSender::Request> requests;
for (const auto& shardId : shardIds) {
requests.emplace_back(shardId, appendShardVersion(command, cri.getShardVersion(shardId)));
}
return processShardResponses(opCtx, dbName, command, requests, executor, throwOnError);
}
Status createIndexOnCollection(OperationContext* opCtx,
const NamespaceString& ns,
const BSONObj& keys,

View File

@ -88,19 +88,6 @@ MONGO_MOD_NEEDS_REPLACEMENT std::vector<AsyncRequestsSender::Response> sendComma
const std::shared_ptr<executor::TaskExecutor>& executor,
bool throwOnError = true);
/**
* Generic utility to send a command to a list of shards attaching the shard version to the request.
* If `throwOnError=true`, throws in case one of the commands fails.
*/
MONGO_MOD_NEEDS_REPLACEMENT std::vector<AsyncRequestsSender::Response>
sendCommandToShardsWithVersion(OperationContext* opCtx,
const DatabaseName& dbName,
const BSONObj& command,
const std::vector<ShardId>& shardIds,
const std::shared_ptr<executor::TaskExecutor>& executor,
const CollectionRoutingInfo& cri,
bool throwOnError = true);
/**
* Helper function to create an index on a collection locally.
*/

View File

@ -94,7 +94,7 @@ public:
uassert(ErrorCodes::NamespaceNotSharded,
str::stream()
<< "Can't execute " << Request::kCommandName
<< "on unsharded collection " << redact(ns().toStringForErrorMsg()),
<< " on unsharded collection " << redact(ns().toStringForErrorMsg()),
coll.getShardingDescription().isSharded());
shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible(

View File

@ -362,16 +362,12 @@ std::vector<AsyncRequestsSender::Response> gatherResponsesImpl(
Shard::RetryPolicy retryPolicy,
const std::vector<AsyncRequestsSender::Request>& requests,
bool throwOnStaleShardVersionErrors,
const std::shared_ptr<executor::TaskExecutor>& executor,
RoutingContext* routingCtx = nullptr) {
// Send the requests.
MultiStatementTransactionRequestsSender ars(
opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
dbName,
requests,
readPref,
retryPolicy);
opCtx, executor, dbName, requests, readPref, retryPolicy);
if (routingCtx && routingCtx->hasNss(nss)) {
routingCtx->onRequestSentForNss(nss);
@ -438,15 +434,18 @@ std::vector<AsyncRequestsSender::Response> gatherResponses(
const ReadPreferenceSetting& readPref,
Shard::RetryPolicy retryPolicy,
const std::vector<AsyncRequestsSender::Request>& requests,
RoutingContext* routingCtx) {
return gatherResponsesImpl(opCtx,
dbName,
nss,
readPref,
retryPolicy,
requests,
true /* throwOnStaleShardVersionErrors */,
routingCtx);
RoutingContext* routingCtx,
std::shared_ptr<executor::TaskExecutor> executor) {
return gatherResponsesImpl(
opCtx,
dbName,
nss,
readPref,
retryPolicy,
requests,
true /* throwOnStaleShardVersionErrors */,
executor ? executor : Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
routingCtx);
}
BSONObj appendDbVersionIfPresent(BSONObj cmdObj, const CachedDatabaseInfo& dbInfo) {
@ -607,7 +606,8 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting
const BSONObj& collation,
const boost::optional<BSONObj>& letParameters,
const boost::optional<LegacyRuntimeConstants>& runtimeConstants,
bool eligibleForSampling) {
bool eligibleForSampling,
std::shared_ptr<executor::TaskExecutor> executor) {
auto expCtx =
makeExpressionContextWithDefaultsForTargeter(opCtx,
nss,
@ -624,7 +624,8 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting
retryPolicy,
query,
collation,
eligibleForSampling);
eligibleForSampling,
std::move(executor));
}
[[nodiscard]] std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRoutingTable(
@ -636,7 +637,8 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting
Shard::RetryPolicy retryPolicy,
const BSONObj& query,
const BSONObj& collation,
bool eligibleForSampling) {
bool eligibleForSampling,
std::shared_ptr<executor::TaskExecutor> executor) {
const auto requests =
buildVersionedRequestsForTargetedShards(expCtx,
nss,
@ -652,7 +654,8 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting
readPref,
retryPolicy,
requests,
&routingCtx);
&routingCtx,
std::move(executor));
}
std::vector<AsyncRequestsSender::Response>
@ -686,6 +689,7 @@ scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors(
retryPolicy,
requests,
false /* throwOnStaleShardVersionErrors */,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
&routingCtx);
}
@ -712,6 +716,7 @@ scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors(
retryPolicy,
requests,
false /* throwOnStaleShardVersionErrors */,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
&routingCtx);
}

View File

@ -160,7 +160,8 @@ std::vector<AsyncRequestsSender::Response> gatherResponses(
const ReadPreferenceSetting& readPref,
Shard::RetryPolicy retryPolicy,
const std::vector<AsyncRequestsSender::Request>& requests,
RoutingContext* routingCtx = nullptr);
RoutingContext* routingCtx = nullptr,
std::shared_ptr<executor::TaskExecutor> executor = nullptr);
/**
* Returns a copy of 'cmdObj' with dbVersion appended if it exists in 'dbInfo'
@ -332,7 +333,8 @@ std::vector<AsyncRequestsSender::Response> scatterGatherUnversionedTargetConfigS
const BSONObj& collation,
const boost::optional<BSONObj>& letParameters,
const boost::optional<LegacyRuntimeConstants>& runtimeConstants,
bool eligibleForSampling = false);
bool eligibleForSampling = false,
std::shared_ptr<executor::TaskExecutor> executor = nullptr);
/**
* This overload is for callers which already have a fully initialized 'ExpressionContext' (e.g.
@ -347,7 +349,8 @@ std::vector<AsyncRequestsSender::Response> scatterGatherUnversionedTargetConfigS
Shard::RetryPolicy retryPolicy,
const BSONObj& query,
const BSONObj& collation,
bool eligibleForSampling = false);
bool eligibleForSampling = false,
std::shared_ptr<executor::TaskExecutor> executor = nullptr);
/**
* Utility for dispatching versioned commands on a namespace, deciding which shards to