SERVER-114264 Port adhoc command shardVersion construction to cluster_commands_helpers.h (#44621)

GitOrigin-RevId: 5e66ea0cd87e740e27a9759b6e033239a6f584fa
This commit is contained in:
Aitor Esteve Alvarado 2025-12-11 17:09:34 +01:00 committed by MongoDB Bot
parent 7380779c9e
commit aaada80f38
5 changed files with 147 additions and 291 deletions

View File

@ -29,31 +29,22 @@
#include "mongo/base/error_codes.h"
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/resource_pattern.h"
#include "mongo/db/commands.h"
#include "mongo/db/database_name.h"
#include "mongo/db/global_catalog/chunk_manager.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/router_role/cluster_commands_helpers.h"
#include "mongo/db/router_role/routing_cache/catalog_cache.h"
#include "mongo/db/service_context.h"
#include "mongo/db/sharding_environment/client/shard.h"
#include "mongo/db/sharding_environment/grid.h"
#include "mongo/db/sharding_environment/shard_id.h"
#include "mongo/db/topology/shard_registry.h"
#include "mongo/db/versioning_protocol/shard_version.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/s/analyze_shard_key_common_gen.h"
#include "mongo/s/configure_query_analyzer_cmd_gen.h"
#include "mongo/util/assert_util.h"
#include <memory>
#include <string>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
@ -63,14 +54,6 @@ namespace analyze_shard_key {
namespace {
/**
* Returns a new command object with database version appended to it based on the given routing
* info.
*/
BSONObj makeVersionedCmdObj(const CollectionRoutingInfo& cri, const BSONObj& unversionedCmdObj) {
return appendDbVersionIfPresent(unversionedCmdObj, cri.getDbVersion());
}
class ConfigureQueryAnalyzerCmd : public TypedCommand<ConfigureQueryAnalyzerCmd> {
public:
using Request = ConfigureQueryAnalyzer;

View File

@ -462,15 +462,11 @@ public:
? cm.getTimeseriesFields()
: boost::none;
auto cmdObj =
const auto cmdObj =
createAggregateCmdObj(opCtx, parsedInfoFromRequest, nss, timeseriesFields);
std::vector<AsyncRequestsSender::Request> requests;
requests.reserve(allShardsContainingChunksForNs.size());
for (const auto& shardId : allShardsContainingChunksForNs) {
requests.emplace_back(
shardId, appendShardVersion(cmdObj, cri.getShardVersion(shardId)));
}
const auto requests = buildVersionedRequests(
opCtx, nss, cri, allShardsContainingChunksForNs, cmdObj);
MultiStatementTransactionRequestsSender ars(
opCtx,
@ -607,20 +603,14 @@ public:
[&](OperationContext* opCtx, RoutingContext& routingCtx) {
const auto& cri = routingCtx.getCollectionRoutingInfo(nss);
auto allShardsContainingChunksForNs =
getShardsToTarget(opCtx, cri, nss, parsedInfoFromRequest);
auto cmdObj =
const auto cmdObj =
createAggregateCmdObj(opCtx, parsedInfoFromRequest, nss, boost::none);
const auto aggExplainCmdObj = ClusterExplain::wrapAsExplain(cmdObj, verbosity);
std::vector<AsyncRequestsSender::Request> requests;
requests.reserve(allShardsContainingChunksForNs.size());
for (const auto& shardId : allShardsContainingChunksForNs) {
requests.emplace_back(
shardId,
appendShardVersion(aggExplainCmdObj, cri.getShardVersion(shardId)));
}
const auto requests = buildVersionedRequests(
opCtx,
nss,
cri,
getShardsToTarget(opCtx, cri, nss, parsedInfoFromRequest),
ClusterExplain::wrapAsExplain(cmdObj, verbosity));
Timer timer;
MultiStatementTransactionRequestsSender ars(

View File

@ -32,7 +32,6 @@
#include "mongo/base/error_extra_info.h"
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsontypes.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connection_string.h"
@ -50,7 +49,6 @@
#include "mongo/db/generic_argument_util.h"
#include "mongo/db/global_catalog/chunk.h"
#include "mongo/db/global_catalog/chunk_manager.h"
#include "mongo/db/global_catalog/ddl/cluster_ddl.h"
#include "mongo/db/global_catalog/type_collection_common_types_gen.h"
#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/pipeline/expression_context.h"
@ -60,8 +58,6 @@
#include "mongo/db/query/write_ops/write_ops_gen.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/router_role/cluster_commands_helpers.h"
#include "mongo/db/router_role/collection_routing_info_targeter.h"
#include "mongo/db/router_role/routing_cache/catalog_cache.h"
#include "mongo/db/server_options.h"
#include "mongo/db/shard_role/shard_catalog/document_validation.h"
#include "mongo/db/shard_role/shard_catalog/raw_data_operation.h"
@ -70,9 +66,7 @@
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/db/timeseries/timeseries_request_util.h"
#include "mongo/db/timeseries/timeseries_update_delete_util.h"
#include "mongo/db/timeseries/timeseries_write_util.h"
#include "mongo/db/transaction/transaction_api.h"
#include "mongo/db/version_context.h"
#include "mongo/db/write_concern_options.h"
@ -84,7 +78,6 @@
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/commands/document_shard_key_update_util.h"
#include "mongo/s/commands/query_cmd/cluster_explain.h"
#include "mongo/s/multi_statement_transaction_requests_sender.h"
#include "mongo/s/query/shard_key_pattern_query_util.h"
#include "mongo/s/query_analysis_sampler_util.h"
#include "mongo/s/request_types/cluster_commands_without_shard_key_gen.h"
@ -407,27 +400,17 @@ BSONObj prepareCmdObjForPassthrough(
OperationContext* opCtx,
const BSONObj& cmdObj,
const NamespaceString& nss,
bool isExplain,
const boost::optional<DatabaseVersion>& dbVersion,
const boost::optional<ShardVersion>& shardVersion,
bool eligibleForSampling,
boost::optional<bool> allowShardKeyUpdatesWithoutFullShardKeyInQuery) {
BSONObj filteredCmdObj = CommandHelpers::filterCommandRequestForPassthrough(cmdObj);
if (!isExplain) {
BSONObj newCmdObj = CommandHelpers::filterCommandRequestForPassthrough(cmdObj);
if (eligibleForSampling) {
if (auto sampleId = analyze_shard_key::tryGenerateSampleId(
opCtx, nss, cmdObj.firstElementFieldNameStringData())) {
filteredCmdObj =
analyze_shard_key::appendSampleId(std::move(filteredCmdObj), std::move(*sampleId));
newCmdObj =
analyze_shard_key::appendSampleId(std::move(newCmdObj), std::move(*sampleId));
}
}
BSONObj newCmdObj(std::move(filteredCmdObj));
if (dbVersion) {
newCmdObj = appendDbVersionIfPresent(newCmdObj, *dbVersion);
}
if (shardVersion) {
newCmdObj = appendShardVersion(newCmdObj, *shardVersion);
}
if (opCtx->isRetryableWrite()) {
if (!newCmdObj.hasField(write_ops::WriteCommandRequestBase::kStmtIdFieldName)) {
BSONObjBuilder bob(newCmdObj);
@ -783,23 +766,16 @@ Status FindAndModifyCmd::explain(OperationContext* opCtx,
return Status::OK();
}
auto shardVersion = cri.hasRoutingTable()
? boost::make_optional(cri.getShardVersion(*shardId))
: boost::make_optional(!cri.getDbVersion().isFixed(), ShardVersion::UNTRACKED());
auto dbVersion =
cri.hasRoutingTable() ? boost::none : boost::make_optional(cri.getDbVersion());
_runCommand(opCtx,
*shardId,
shardVersion,
dbVersion,
cri,
nss,
applyReadWriteConcern(
opCtx, false, false, makeExplainCmd(opCtx, cmdObj, verbosity)),
true /* isExplain */,
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
isTimeseriesLogicalRequest,
&bob);
&bob,
false /*explain is not eligible for sampling*/);
const auto millisElapsed = timer.millis();
@ -961,16 +937,15 @@ bool FindAndModifyCmd::run(OperationContext* opCtx,
// protocol.
_runCommand(opCtx,
*shardId,
cri.getShardVersion(*shardId),
boost::none,
cri,
nss,
applyReadWriteConcern(opCtx, this, cmdObjForShard),
false /* isExplain */,
allowShardKeyUpdatesWithoutFullShardKeyInQuery,
isTimeseriesLogicalRequest,
&result);
} else {
_runCommandWithoutShardKey(opCtx,
cri,
nss,
applyReadWriteConcern(opCtx, this, cmdObjForShard),
isTimeseriesLogicalRequest,
@ -988,11 +963,9 @@ bool FindAndModifyCmd::run(OperationContext* opCtx,
_runCommand(opCtx,
shardId,
cri.getShardVersion(shardId),
boost::none,
cri,
nss,
applyReadWriteConcern(opCtx, this, cmdObjForShard),
false /* isExplain */,
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
isTimeseriesLogicalRequest,
&result);
@ -1000,17 +973,14 @@ bool FindAndModifyCmd::run(OperationContext* opCtx,
} else {
getQueryCounters(opCtx).findAndModifyUnshardedCount.increment(1);
_runCommand(
opCtx,
cri.getDbPrimaryShardId(),
boost::make_optional(!cri.getDbVersion().isFixed(), ShardVersion::UNTRACKED()),
cri.getDbVersion(),
nss,
applyReadWriteConcern(opCtx, this, cmdObjForShard),
false /* isExplain */,
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
isTimeseriesLogicalRequest,
&result);
_runCommand(opCtx,
cri.getDbPrimaryShardId(),
cri,
nss,
applyReadWriteConcern(opCtx, this, cmdObjForShard),
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
isTimeseriesLogicalRequest,
&result);
}
};
@ -1038,16 +1008,15 @@ bool FindAndModifyCmd::getCrudProcessedFromCmd(const BSONObj& cmdObj) {
// Catches errors in the given response, and reruns the command if necessary. Uses the given
// response to construct the findAndModify command result passed to the client.
void FindAndModifyCmd::_constructResult(OperationContext* opCtx,
const ShardId& shardId,
const boost::optional<ShardVersion>& shardVersion,
const boost::optional<DatabaseVersion>& dbVersion,
const NamespaceString& nss,
const BSONObj& cmdObj,
const Status& responseStatus,
const BSONObj& response,
bool isTimeseriesViewRequest,
BSONObjBuilder* result) {
void FindAndModifyCmd::_handleResponseAndConstructResult(OperationContext* opCtx,
const ShardId& shardId,
const CollectionRoutingInfo& cri,
const NamespaceString& nss,
const BSONObj& cmdObj,
const Status& responseStatus,
const BSONObj& response,
bool isTimeseriesViewRequest,
BSONObjBuilder* result) {
auto txnRouter = TransactionRouter::get(opCtx);
bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter;
@ -1068,14 +1037,8 @@ void FindAndModifyCmd::_constructResult(OperationContext* opCtx,
opCtx->setQuerySamplingOptions(QuerySamplingOptions::kOptOut);
if (isRetryableWrite) {
_handleWouldChangeOwningShardErrorRetryableWriteLegacy(opCtx,
shardId,
shardVersion,
dbVersion,
nss,
cmdObj,
isTimeseriesViewRequest,
result);
_handleWouldChangeOwningShardErrorRetryableWriteLegacy(
opCtx, shardId, cri, nss, cmdObj, isTimeseriesViewRequest, result);
} else {
handleWouldChangeOwningShardErrorTransactionLegacy(opCtx,
nss,
@ -1104,6 +1067,7 @@ void FindAndModifyCmd::_constructResult(OperationContext* opCtx,
// Two-phase protocol to run a findAndModify command without a shard key or _id.
void FindAndModifyCmd::_runCommandWithoutShardKey(OperationContext* opCtx,
const CollectionRoutingInfo& cri,
const NamespaceString& nss,
const BSONObj& cmdObj,
bool isTimeseriesViewRequest,
@ -1115,9 +1079,7 @@ void FindAndModifyCmd::_runCommandWithoutShardKey(OperationContext* opCtx,
prepareCmdObjForPassthrough(opCtx,
cmdObj,
nss,
false /* isExplain */,
boost::none /* dbVersion */,
boost::none /* shardVersion */,
true /* eligibleForSampling */,
allowShardKeyUpdatesWithoutFullShardKeyInQuery);
// TODO SERVER-108928 - Handle this inside of prepareCmdObjForPassthrough.
@ -1166,16 +1128,15 @@ void FindAndModifyCmd::_runCommandWithoutShardKey(OperationContext* opCtx,
}
// Extract findAndModify command result from the result of the two phase write protocol.
_constructResult(opCtx,
shardId,
boost::none /* shardVersion */,
boost::none /* dbVersion */,
nss,
cmdObj,
swRes.getStatus(),
cmdResponse,
isTimeseriesViewRequest,
result);
_handleResponseAndConstructResult(opCtx,
shardId,
cri,
nss,
cmdObj,
swRes.getStatus(),
cmdResponse,
isTimeseriesViewRequest,
result);
}
// Two-phase protocol to run an explain for a findAndModify command without a shard key or _id.
@ -1188,9 +1149,7 @@ void FindAndModifyCmd::_runExplainWithoutShardKey(OperationContext* opCtx,
opCtx,
originalExplainObj,
nss,
true /* isExplain */,
boost::none /* dbVersion */,
boost::none /* shardVersion */,
false /* eligibleForSampling */,
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */);
auto explainObj = translateExplainObjForRawData(opCtx, cmdObjForPassthrough, nss);
@ -1236,62 +1195,62 @@ void FindAndModifyCmd::_runExplainWithoutShardKey(OperationContext* opCtx,
void FindAndModifyCmd::_runCommand(
OperationContext* opCtx,
const ShardId& shardId,
const boost::optional<ShardVersion>& shardVersion,
const boost::optional<DatabaseVersion>& dbVersion,
const CollectionRoutingInfo& cri,
const NamespaceString& nss,
const BSONObj& cmdObj,
bool isExplain,
boost::optional<bool> allowShardKeyUpdatesWithoutFullShardKeyInQuery,
bool isTimeseriesViewRequest,
BSONObjBuilder* result) {
BSONObjBuilder* result,
bool eligibleForSampling) {
auto txnRouter = TransactionRouter::get(opCtx);
bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter;
const auto response = [&] {
std::vector<AsyncRequestsSender::Request> requests;
auto cmdObjForPassthrough =
prepareCmdObjForPassthrough(opCtx,
cmdObj,
nss,
isExplain,
dbVersion,
shardVersion,
allowShardKeyUpdatesWithoutFullShardKeyInQuery);
requests.emplace_back(shardId, cmdObjForPassthrough);
// Pass 'false' to `eligibleForSampling` since scatterGatherVersionedTargetToShards is going to
// add those fields.
const auto passthroughCmdObj =
prepareCmdObjForPassthrough(opCtx,
cmdObj,
nss,
/*eligibleForSampling=*/false,
allowShardKeyUpdatesWithoutFullShardKeyInQuery);
// Create a RoutingContext from the CollectionRoutingInfo.
auto routingCtx = RoutingContext::createSynthetic({{nss, cri}});
const auto responses = scatterGatherVersionedTargetToShards(
opCtx,
*routingCtx,
nss.dbName(),
nss,
{shardId},
passthroughCmdObj,
kPrimaryOnlyReadPreference,
isRetryableWrite ? Shard::RetryPolicy::kIdempotent
: Shard::RetryPolicy::kStrictlyNotIdempotent,
eligibleForSampling);
MultiStatementTransactionRequestsSender ars(
opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
nss.dbName(),
requests,
kPrimaryOnlyReadPreference,
isRetryableWrite ? Shard::RetryPolicy::kIdempotent
: Shard::RetryPolicy::kStrictlyNotIdempotent);
tassert(11426400,
fmt::format("scatterGatherVersionedTargetToShards returned more than one response "
"despite passing a single shardId: size: {}",
responses.size()),
responses.size() == 1);
auto response = ars.next();
invariant(ars.done());
return uassertStatusOK(std::move(response.swResponse));
}();
_constructResult(opCtx,
shardId,
shardVersion,
dbVersion,
nss,
cmdObj,
getStatusFromCommandResult(response.data),
response.data,
isTimeseriesViewRequest,
result);
const auto& response = uassertStatusOK(responses.front().swResponse);
_handleResponseAndConstructResult(opCtx,
shardId,
cri,
nss,
cmdObj,
getStatusFromCommandResult(response.data),
response.data,
isTimeseriesViewRequest,
result);
}
// TODO SERVER-67429: Remove this function.
void FindAndModifyCmd::_handleWouldChangeOwningShardErrorRetryableWriteLegacy(
OperationContext* opCtx,
const ShardId& shardId,
const boost::optional<ShardVersion>& shardVersion,
const boost::optional<DatabaseVersion>& dbVersion,
const CollectionRoutingInfo& cri,
const NamespaceString& nss,
const BSONObj& cmdObj,
bool isTimeseriesViewRequest,
@ -1321,17 +1280,15 @@ void FindAndModifyCmd::_handleWouldChangeOwningShardErrorRetryableWriteLegacy(
isTimeseriesViewRequest)) {
getQueryCounters(opCtx).findAndModifyNonTargetedShardedCount.increment(1);
_runCommandWithoutShardKey(
opCtx, nss, stripWriteConcern(cmdObj), isTimeseriesViewRequest, result);
opCtx, cri, nss, stripWriteConcern(cmdObj), isTimeseriesViewRequest, result);
} else {
getQueryCounters(opCtx).findAndModifyTargetedShardedCount.increment(1);
_runCommand(opCtx,
shardId,
shardVersion,
dbVersion,
cri,
nss,
stripWriteConcern(cmdObj),
false /* isExplain */,
boost::none /* allowShardKeyUpdatesWithoutFullShardKeyInQuery */,
isTimeseriesViewRequest,
result);

View File

@ -44,6 +44,7 @@
#include "mongo/db/query/explain_options.h"
#include "mongo/db/read_concern_support_result.h"
#include "mongo/db/repl/read_concern_level.h"
#include "mongo/db/router_role/routing_cache/catalog_cache.h"
#include "mongo/db/service_context.h"
#include "mongo/db/sharding_environment/shard_id.h"
#include "mongo/db/versioning_protocol/database_version.h"
@ -154,19 +155,19 @@ private:
// Catches errors in the given response, and reruns the command if necessary. Uses the given
// response to construct the findAndModify command result passed to the client.
static void _constructResult(OperationContext* opCtx,
const ShardId& shardId,
const boost::optional<ShardVersion>& shardVersion,
const boost::optional<DatabaseVersion>& dbVersion,
const NamespaceString& nss,
const BSONObj& cmdObj,
const Status& responseStatus,
const BSONObj& response,
bool isTimeseriesViewRequest,
BSONObjBuilder* result);
static void _handleResponseAndConstructResult(OperationContext* opCtx,
const ShardId& shardId,
const CollectionRoutingInfo& cri,
const NamespaceString& nss,
const BSONObj& cmdObj,
const Status& responseStatus,
const BSONObj& response,
bool isTimeseriesViewRequest,
BSONObjBuilder* result);
// Two-phase protocol to run a findAndModify command without a shard key or _id.
static void _runCommandWithoutShardKey(OperationContext* opCtx,
const CollectionRoutingInfo& cri,
const NamespaceString& nss,
const BSONObj& cmdObj,
bool isTimeseriesViewRequest,
@ -182,21 +183,19 @@ private:
// Command invocation to be used if a shard key is specified or the collection is unsharded.
static void _runCommand(OperationContext* opCtx,
const ShardId& shardId,
const boost::optional<ShardVersion>& shardVersion,
const boost::optional<DatabaseVersion>& dbVersion,
const CollectionRoutingInfo& cri,
const NamespaceString& nss,
const BSONObj& cmdObj,
bool isExplain,
boost::optional<bool> allowShardKeyUpdatesWithoutFullShardKeyInQuery,
bool isTimeseriesViewRequest,
BSONObjBuilder* result);
BSONObjBuilder* result,
bool eligibleForSampling = true);
// TODO SERVER-67429: Remove this function.
static void _handleWouldChangeOwningShardErrorRetryableWriteLegacy(
OperationContext* opCtx,
const ShardId& shardId,
const boost::optional<ShardVersion>& shardVersion,
const boost::optional<DatabaseVersion>& dbVersion,
const CollectionRoutingInfo& cri,
const NamespaceString& nss,
const BSONObj& cmdObj,
bool isTimeseriesViewRequest,

View File

@ -155,48 +155,48 @@ static const BSONObj kGeoNearDistanceMetaProjection = BSON("$meta" << "geoNearDi
const char kFindCmdName[] = "find";
std::unique_ptr<FindCommandRequest> makeFindCommandForShards(OperationContext* opCtx,
const std::set<ShardId>& shardIds,
const CanonicalQuery& query,
const boost::optional<UUID> sampleId,
bool requestQueryStatsFromRemotes,
const UUID& opKey) {
std::unique_ptr<FindCommandRequest> findCommand;
if (shardIds.size() > 1) {
findCommand = uassertStatusOK(ClusterFind::transformQueryForShards(query));
} else {
// Forwards the FindCommandRequest as is to a single shard so that limit and skip can
// be applied on mongod.
findCommand = std::make_unique<FindCommandRequest>(query.getFindCommandRequest());
}
BSONObj makeFindCommandForShards(OperationContext* opCtx,
const std::set<ShardId>& shardIds,
const CanonicalQuery& query,
bool requestQueryStatsFromRemotes,
const UUID& opKey) {
auto findCommand = [&]() -> FindCommandRequest {
if (shardIds.size() > 1) {
return *uassertStatusOK(ClusterFind::transformQueryForShards(query));
} else {
// Forwards the FindCommandRequest as is to a single shard so that limit and skip can
// be applied on mongod.
return query.getFindCommandRequest();
}
}();
// Reset the input request's generic arguments and only set the ones needed for the query.
// TODO: SERVER-90827 Only reset arguments not suitable for passing through to shards.
GenericArguments args;
std::swap(findCommand->getGenericArguments(), args);
findCommand->setUnwrappedReadPref(std::move(args.getUnwrappedReadPref()));
findCommand->setMaxTimeMS(args.getMaxTimeMS());
findCommand->setReadConcern(std::move(args.getReadConcern()));
std::swap(findCommand.getGenericArguments(), args);
findCommand.setUnwrappedReadPref(std::move(args.getUnwrappedReadPref()));
findCommand.setMaxTimeMS(args.getMaxTimeMS());
findCommand.setReadConcern(std::move(args.getReadConcern()));
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
if (readConcernArgs.wasAtClusterTimeSelected()) {
// If mongos selected atClusterTime or received it from client, transmit it to shard.
findCommand->setReadConcern(readConcernArgs);
findCommand.setReadConcern(readConcernArgs);
}
query.getExpCtx()->initializeReferencedSystemVariables();
// Replace the 'letParams' expressions with their values.
if (auto letParams = findCommand->getLet()) {
if (auto letParams = findCommand.getLet()) {
const auto& vars = query.getExpCtx()->variables;
const auto& vps = query.getExpCtx()->variablesParseState;
findCommand->setLet(vars.toBSON(vps, *letParams));
findCommand.setLet(vars.toBSON(vps, *letParams));
}
// ExpressionContext may contain previously looked up query settings. Propagate it to the
// shards.
if (!query_settings::isDefault(query.getExpCtx()->getQuerySettings())) {
findCommand->setQuerySettings(query.getExpCtx()->getQuerySettings());
findCommand.setQuerySettings(query.getExpCtx()->getQuerySettings());
}
// Request metrics if necessary.
@ -205,7 +205,7 @@ std::unique_ptr<FindCommandRequest> makeFindCommandForShards(OperationContext* o
// rate) dictates we should gather metrics, or the user sent the flag to us.
auto origValue = query.getFindCommandRequest().getIncludeQueryStatsMetrics();
if (origValue.value_or(false) || requestQueryStatsFromRemotes) {
findCommand->setIncludeQueryStatsMetrics(true);
findCommand.setIncludeQueryStatsMetrics(true);
}
}
@ -214,79 +214,16 @@ std::unique_ptr<FindCommandRequest> makeFindCommandForShards(OperationContext* o
// necessarily want to forward all transaction arguments directly from the input request since
// we may have already started a transaction for internal purposes (e.g. FLE does this).
if (auto& lsid = opCtx->getLogicalSessionId()) {
findCommand->setLsid(generic_argument_util::toLogicalSessionFromClient(*lsid));
findCommand.setLsid(generic_argument_util::toLogicalSessionFromClient(*lsid));
}
findCommand->setTxnNumber(opCtx->getTxnNumber());
findCommand->setClientOperationKey(opKey);
findCommand.setTxnNumber(opCtx->getTxnNumber());
findCommand.setClientOperationKey(opKey);
return findCommand;
}
/**
* Constructs the shard requests (ShardId, BSONObj) pairs for the find command by attaching the
* shardVersion, txnNumber and sampleId if necessary.
*/
std::vector<AsyncRequestsSender::Request> constructRequestsForShards(
OperationContext* opCtx,
const CollectionRoutingInfo& cri,
const std::set<ShardId>& shardIds,
const CanonicalQuery& query,
const boost::optional<UUID> sampleId,
bool requestQueryStatsFromRemotes,
const auto& opKey) {
// Choose the shard to sample the query on if needed.
const auto sampleShardId = sampleId
? boost::make_optional(analyze_shard_key::getRandomShardId(shardIds))
: boost::none;
// Helper methods for appending additional attributes to the shard command.
auto appendVersions = [&](const auto& shardId, auto& cmdBuilder) {
if (cri.hasRoutingTable()) {
appendShardVersion(cmdBuilder, cri.getShardVersion(shardId));
} else if (!query.nss().isOnInternalDb()) {
appendShardVersion(cmdBuilder, ShardVersion::UNTRACKED());
appendDbVersionIfPresent(cmdBuilder, cri.getDbVersion());
}
};
auto appendSampleId = [&](const auto& shardId, auto& cmdBuilder) {
if (shardId == sampleShardId) {
analyze_shard_key::appendSampleId(&cmdBuilder, *sampleId);
}
};
// Constructs the shard request by appending additional attributes to the serialized
// 'findCommandToForward'.
const auto findCommandToForward = makeFindCommandForShards(
opCtx, shardIds, query, sampleId, requestQueryStatsFromRemotes, opKey);
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
auto makeShardRequest = [&](const auto& shardId) {
const auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId));
tassert(11052355,
"Expected either non-config shard or valid connection string for the config shard",
!shard->isConfig() || shard->getConnString());
BSONObjBuilder cmdBuilder;
findCommandToForward->serialize(&cmdBuilder);
appendVersions(shardId, cmdBuilder);
appendSampleId(shardId, cmdBuilder);
auto cmdObj = isRawDataOperation(opCtx) &&
findCommandToForward->getNamespaceOrUUID().isNamespaceString() &&
findCommandToForward->getNamespaceOrUUID().nss().isTimeseriesBucketsCollection()
? rewriteCommandForRawDataOperation<FindCommandRequest>(
cmdBuilder.obj(), findCommandToForward->getNamespaceOrUUID().nss().coll())
: cmdBuilder.obj();
return AsyncRequestsSender::Request(shardId, std::move(cmdObj), std::move(shard));
};
std::vector<AsyncRequestsSender::Request> requests;
requests.reserve(shardIds.size());
std::transform(
shardIds.begin(), shardIds.end(), std::back_inserter(requests), makeShardRequest);
return requests;
return isRawDataOperation(opCtx) && findCommand.getNamespaceOrUUID().isNamespaceString() &&
findCommand.getNamespaceOrUUID().nss().isTimeseriesBucketsCollection()
? rewriteCommandForRawDataOperation<FindCommandRequest>(
findCommand.toBSON(), findCommand.getNamespaceOrUUID().nss().coll())
: findCommand.toBSON();
}
void updateNumHostsTargetedMetrics(OperationContext* opCtx,
@ -306,7 +243,6 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
RoutingContext& routingCtx,
const CanonicalQuery& query,
const ReadPreferenceSetting& readPref,
const boost::optional<UUID> sampleId,
std::vector<BSONObj>* results,
bool* partialResultsReturned) {
const auto& findCommand = query.getFindCommandRequest();
@ -401,8 +337,10 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
// shards, attaching the shardVersion and session info, if necessary. Attach our own
// OperationKey as well so establishCursors won't copy each request.
std::vector<OperationKey> opKeys{UUID::gen()};
auto requests = constructRequestsForShards(
opCtx, cri, shardIds, query, sampleId, requestQueryStatsFromRemotes, opKeys.front());
const auto findCommandToForward = makeFindCommandForShards(
opCtx, shardIds, query, requestQueryStatsFromRemotes, opKeys.front());
auto requests = buildVersionedRequests(
opCtx, query.nss(), cri, shardIds, findCommandToForward, /*eligibleForSampling=*/true);
// The call to establishCursors has its own timeout mechanism that is controlled by the
// opCtx, so we don't expect runWithDeadline to throw a timeout at this level. We use
@ -852,12 +790,6 @@ void ClusterFind::runQuery(OperationContext* opCtx,
use the classic encoding method by default. */
canonical_query_encoder::encodeClassic(*query));
// Try to generate a sample id for this query here instead of inside
// 'runQueryWithoutRetrying()' since it is incorrect to generate multiple sample ids
// for a single query.
const auto sampleId = analyze_shard_key::tryGenerateSampleId(
opCtx, query->nss(), analyze_shard_key::SampledCommandNameEnum::kFind);
// If this is a viewless timeseries namespace, run the equivalent aggregation (which
// writes its own cursor response into 'result') and short-circuit the normal find
// path.
@ -889,13 +821,8 @@ void ClusterFind::runQuery(OperationContext* opCtx,
// Do the work to generate the first batch of results. This blocks waiting to
// get responses from the shard(s).
auto cursorId = runQueryWithoutRetrying(opCtx,
routingCtx,
*query,
readPref,
sampleId,
&batch,
&partialResultsReturned);
auto cursorId = runQueryWithoutRetrying(
opCtx, routingCtx, *query, readPref, &batch, &partialResultsReturned);
CursorResponseBuilder::Options options;
options.isInitialResponse = true;
if (!opCtx->inMultiDocumentTransaction()) {