diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 4882f94bf05..c2fa992da70 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1390,6 +1390,9 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot /jstests/noPassthrough/replication/**/*initial_sync* @10gen/server-initial-sync @svc-auto-approve-bot /jstests/noPassthrough/replication/**/*replication_recovery* @10gen/server-recovery @svc-auto-approve-bot +# The following patterns are parsed from ./jstests/noPassthrough/routing/OWNERS.yml +/jstests/noPassthrough/routing/**/* @10gen/server-catalog-and-routing-routing-and-topology @svc-auto-approve-bot + # The following patterns are parsed from ./jstests/noPassthrough/rs_endpoint/OWNERS.yml /jstests/noPassthrough/rs_endpoint/**/* @10gen/server-cluster-scalability @svc-auto-approve-bot diff --git a/jstests/noPassthrough/routing/BUILD.bazel b/jstests/noPassthrough/routing/BUILD.bazel new file mode 100644 index 00000000000..97513de081b --- /dev/null +++ b/jstests/noPassthrough/routing/BUILD.bazel @@ -0,0 +1,12 @@ +load("//bazel:mongo_js_rules.bzl", "all_subpackage_javascript_files", "mongo_js_library") + +package(default_visibility = ["//visibility:public"]) + +mongo_js_library( + name = "all_javascript_files", + srcs = glob([ + "*.js", + ]), +) + +all_subpackage_javascript_files() diff --git a/jstests/noPassthrough/routing/OWNERS.yml b/jstests/noPassthrough/routing/OWNERS.yml new file mode 100644 index 00000000000..a04db42275a --- /dev/null +++ b/jstests/noPassthrough/routing/OWNERS.yml @@ -0,0 +1,5 @@ +version: 1.0.0 +filters: + - "*": + approvers: + - 10gen/server-catalog-and-routing-routing-and-topology diff --git a/jstests/noPassthrough/routing/create_collection_from_stale_mongos.js b/jstests/noPassthrough/routing/create_collection_from_stale_mongos.js new file mode 100644 index 00000000000..b204a40bab6 --- /dev/null +++ b/jstests/noPassthrough/routing/create_collection_from_stale_mongos.js @@ -0,0 +1,77 @@ +import {after, before, beforeEach, describe, it} from "jstests/libs/mochalite.js"; +import {ShardingTest} from "jstests/libs/shardingtest.js"; + +const dbName = jsTestName(); +const collName = "testColl"; +const collName2 = "testColl2"; + +describe("Test create collection from stale mongos", function () { + before(() => { + this.st = new ShardingTest({shards: 1, rs: {nodes: 1}, mongos: 2}); + + this.staleMongos = this.st.s0; + this.nonStaleMongos = this.st.s1; + }); + after(() => { + this.st.stop(); + }); + + beforeEach(() => { + // Pre-create the collection via the first router. + assert.commandWorked(this.staleMongos.getDB(dbName).createCollection(collName)); + + // Drop the database on another router without removing routing info from first router. + assert.commandWorked(this.nonStaleMongos.getDB(dbName).dropDatabase()); + }); + + it("Create same collection after dropping database via different router", () => { + assert.commandWorked(this.staleMongos.getDB(dbName).createCollection(collName)); + + // Verify that `collName` exists. + assert.eq( + 1, + this.nonStaleMongos.getDB(dbName).getCollectionInfos({name: collName}).length, + "Collection should exist but it doesn't.", + ); + }); + + it("Create different collection after dropping database via different router", () => { + assert.commandWorked(this.staleMongos.getDB(dbName).createCollection(collName2)); + + // Verify that `collName2` exists. + assert.eq( + 1, + this.nonStaleMongos.getDB(dbName).getCollectionInfos({name: collName2}).length, + "Collection should exist but it doesn't.", + ); + + // Verify that `collName` doesn't exist. + assert.eq( + 0, + this.nonStaleMongos.getDB(dbName).getCollectionInfos({name: collName}).length, + "Collection should not exist but it does.", + ); + }); + + it("Insert into collection after dropping database via different router", () => { + assert.commandWorked(this.staleMongos.getDB(dbName)[collName].insert({_id: 1})); + + // Verify that `collName` exists. + assert.eq( + 1, + this.nonStaleMongos.getDB(dbName).getCollectionInfos({name: collName}).length, + "Collection should exist but it doesn't.", + ); + }); + + it("Shard collection after dropping database via different router", () => { + assert.commandWorked(this.staleMongos.adminCommand({shardCollection: dbName + "." + collName, key: {x: 1}})); + + // Verify that `collName` exists. + assert.eq( + 1, + this.nonStaleMongos.getDB(dbName).getCollectionInfos({name: collName}).length, + "Collection should exist but it doesn't.", + ); + }); +}); diff --git a/src/mongo/db/global_catalog/ddl/cluster_create_indexes_cmd.cpp b/src/mongo/db/global_catalog/ddl/cluster_create_indexes_cmd.cpp index 9aac59c85a1..e7ccbce711d 100644 --- a/src/mongo/db/global_catalog/ddl/cluster_create_indexes_cmd.cpp +++ b/src/mongo/db/global_catalog/ddl/cluster_create_indexes_cmd.cpp @@ -130,113 +130,87 @@ public: const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj)); LOGV2_DEBUG(22750, 1, "CMD: createIndexes", logAttrs(nss), "command"_attr = redact(cmdObj)); - const size_t kMaxDatabaseCreationAttempts = 3; - size_t attempts = 1; + sharding::router::CollectionRouter router{opCtx->getServiceContext(), nss}; + router.createDbImplicitlyOnRoute(); + return router.routeWithRoutingContext( + opCtx, + Request::kCommandName, + [&](OperationContext* opCtx, RoutingContext& unusedRoutingCtx) { + // The CollectionRouter is not capable of implicitly translate the namespace + // to a timeseries buckets collection, which is required in this command. + // Hence, we'll use the CollectionRouter to handle StaleConfig errors but + // will ignore its RoutingContext. Instead, we'll use a + // CollectionRoutingInfoTargeter object to properly get the RoutingContext + // when the collection is timeseries. + // TODO (SPM-3830) Use the RoutingContext provided by the CollectionRouter + // once all timeseries collections become viewless. + unusedRoutingCtx.skipValidation(); - while (true) { - try { - // Implicitly create the db if it doesn't exist - cluster::createDatabase(opCtx, nss.dbName()); + // Clear the `result` BSON builder since this lambda function may be retried + // if the router cache is stale. + output.resetToEmpty(); - sharding::router::CollectionRouter router{opCtx->getServiceContext(), nss}; - return router.routeWithRoutingContext( - opCtx, - Request::kCommandName, - [&](OperationContext* opCtx, RoutingContext& unusedRoutingCtx) { - // The CollectionRouter is not capable of implicitly translate the namespace - // to a timeseries buckets collection, which is required in this command. - // Hence, we'll use the CollectionRouter to handle StaleConfig errors but - // will ignore its RoutingContext. Instead, we'll use a - // CollectionRoutingInfoTargeter object to properly get the RoutingContext - // when the collection is timeseries. - // TODO (SPM-3830) Use the RoutingContext provided by the CollectionRouter - // once all timeseries collections become viewless. - unusedRoutingCtx.skipValidation(); + auto targeter = CollectionRoutingInfoTargeter(opCtx, nss); + auto routingInfo = targeter.getRoutingInfo(); + auto cmdToBeSent = cmdObj; - // Clear the `result` BSON builder since this lambda function may be retried - // if the router cache is stale. - output.resetToEmpty(); + if (targeter.timeseriesNamespaceNeedsRewrite(nss)) { + const auto& request = requestParser.request(); + if (auto uuid = request.getCollectionUUID()) { + auto status = + Status(CollectionUUIDMismatchInfo( + nss.dbName(), *uuid, std::string{nss.coll()}, boost::none), + "'collectionUUID' is specified for a time-series view " + "namespace; views do not have UUIDs"); + uassertStatusOK(populateCollectionUUIDMismatch(opCtx, status)); + MONGO_UNREACHABLE_TASSERT(8549600); + } - auto targeter = CollectionRoutingInfoTargeter(opCtx, nss); - auto routingInfo = targeter.getRoutingInfo(); - auto cmdToBeSent = cmdObj; + cmdToBeSent = timeseries::makeTimeseriesCommand( + cmdToBeSent, + nss, + getName(), + CreateIndexesCommand::kIsTimeseriesNamespaceFieldName); + } - if (targeter.timeseriesNamespaceNeedsRewrite(nss)) { - const auto& request = requestParser.request(); - if (auto uuid = request.getCollectionUUID()) { - auto status = Status( - CollectionUUIDMismatchInfo( - nss.dbName(), *uuid, std::string{nss.coll()}, boost::none), - "'collectionUUID' is specified for a time-series view " - "namespace; views do not have UUIDs"); - uassertStatusOK(populateCollectionUUIDMismatch(opCtx, status)); - MONGO_UNREACHABLE_TASSERT(8549600); - } + return routing_context_utils::runAndValidate( + targeter.getRoutingCtx(), [&](RoutingContext& routingCtx) { + auto shardResponses = scatterGatherVersionedTargetByRoutingTable( + opCtx, + routingCtx, + targeter.getNS(), + CommandHelpers::filterCommandRequestForPassthrough( + applyReadWriteConcern(opCtx, this, cmdToBeSent)), + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotent, + BSONObj() /*query*/, + BSONObj() /*collation*/, + boost::none /*letParameters*/, + boost::none /*runtimeConstants*/); - cmdToBeSent = timeseries::makeTimeseriesCommand( - cmdToBeSent, - nss, - getName(), - CreateIndexesCommand::kIsTimeseriesNamespaceFieldName); + std::string errmsg; + bool allShardsSucceeded = + appendRawResponses( + opCtx, &errmsg, &output, shardResponses, shardResponses.size() > 1) + .responseOK; + + // Append the single shard command result to the top-level output to + // ensure parity between replica-set and a single sharded cluster. + if (shardResponses.size() == 1 && allShardsSucceeded) { + CommandHelpers::filterCommandReplyForPassthrough( + shardResponses[0].swResponse.getValue().data, &output); } - return routing_context_utils::runAndValidate( - targeter.getRoutingCtx(), [&](RoutingContext& routingCtx) { - auto shardResponses = scatterGatherVersionedTargetByRoutingTable( - opCtx, - routingCtx, - targeter.getNS(), - CommandHelpers::filterCommandRequestForPassthrough( - applyReadWriteConcern(opCtx, this, cmdToBeSent)), - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - Shard::RetryPolicy::kIdempotent, - BSONObj() /*query*/, - BSONObj() /*collation*/, - boost::none /*letParameters*/, - boost::none /*runtimeConstants*/); + CommandHelpers::appendSimpleCommandStatus( + output, allShardsSucceeded, errmsg); - std::string errmsg; - bool allShardsSucceeded = - appendRawResponses(opCtx, - &errmsg, - &output, - shardResponses, - shardResponses.size() > 1) - .responseOK; + if (allShardsSucceeded) { + LOGV2(5706400, "Indexes created", logAttrs(nss)); + } - // Append the single shard command result to the top-level output to - // ensure parity between replica-set and a single sharded cluster. - if (shardResponses.size() == 1 && allShardsSucceeded) { - CommandHelpers::filterCommandReplyForPassthrough( - shardResponses[0].swResponse.getValue().data, &output); - } - - CommandHelpers::appendSimpleCommandStatus( - output, allShardsSucceeded, errmsg); - - if (allShardsSucceeded) { - LOGV2(5706400, "Indexes created", logAttrs(nss)); - } - - return allShardsSucceeded; - }); + return allShardsSucceeded; }); - } catch (const ExceptionFor&) { - LOGV2_INFO(10370501, - "Failed initialization of routing info because the database has been " - "concurrently dropped", - logAttrs(nss.dbName()), - "attemptNumber"_attr = attempts, - "maxAttempts"_attr = kMaxDatabaseCreationAttempts); - - if (attempts++ >= kMaxDatabaseCreationAttempts) { - // The maximum number of attempts has been reached, so the procedure fails as it - // could be a logical error. At this point, it is unlikely that the error is - // caused by concurrent drop database operations. - throw; - } - } - } + }); } /** diff --git a/src/mongo/db/global_catalog/ddl/cluster_ddl.cpp b/src/mongo/db/global_catalog/ddl/cluster_ddl.cpp index 09865d4db66..cb24e0a81c1 100644 --- a/src/mongo/db/global_catalog/ddl/cluster_ddl.cpp +++ b/src/mongo/db/global_catalog/ddl/cluster_ddl.cpp @@ -176,8 +176,6 @@ CreateCollectionResponse createCollection(OperationContext* opCtx, "Hanging createCollection due to failpoint 'hangCreateUnshardedCollection' finished"); } - const auto dbInfo = createDatabase(opCtx, nss.dbName()); - // The config.system.session collection can only exist as sharded and it's essential for the // correct cluster functionality. To prevent potential issues, the operation must always be // performed as a sharded creation with internal defaults. Ignore potentially different @@ -268,6 +266,14 @@ CreateCollectionResponse createCollection(OperationContext* opCtx, // TODO (SERVER-100309): remove againstFirstShard option once 9.0 becomes last LTS. if (againstFirstShard) { + tassert( + 113986, + "createCollection can only run against the first shard for `config.system.sessions` " + "collection.", + nss == NamespaceString::kLogicalSessionsNamespace); + + const auto dbInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.dbName())); const auto cmdResponse = executeCommandAgainstFirstShard(opCtx, nss.dbName(), @@ -282,6 +288,7 @@ CreateCollectionResponse createCollection(OperationContext* opCtx, } else { sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), nss.dbName()); + router.createDbImplicitlyOnRoute(); router.route( opCtx, "createCollection"_sd, diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 560539dfb68..d396b4f16d5 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -432,12 +432,11 @@ void ShardServerProcessInterface::_createCollectionCommon(OperationContext* opCt const DatabaseName& dbName, const BSONObj& cmdObj, boost::optional dataShard) { - cluster::createDatabase(opCtx, dbName, dataShard); - // TODO (SERVER-77915): Remove the FCV check and keep only the 'else' branch if (!feature_flags::g80CollectionCreationPath.isEnabled( serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) { sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), dbName); + router.createDbImplicitlyOnRoute(dataShard); router.route(opCtx, "ShardServerProcessInterface::_createCollectionCommon", [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) { @@ -483,6 +482,7 @@ void ShardServerProcessInterface::_createCollectionCommon(OperationContext* opCt shardsvrCollCommand.setShardsvrCreateCollectionRequest(request); sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), dbName); + router.createDbImplicitlyOnRoute(dataShard); router.route(opCtx, "ShardServerProcessInterface::_createCollectionCommon", [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) { diff --git a/src/mongo/db/router_role/router_role.cpp b/src/mongo/db/router_role/router_role.cpp index 41f642aedef..41564749d70 100644 --- a/src/mongo/db/router_role/router_role.cpp +++ b/src/mongo/db/router_role/router_role.cpp @@ -31,6 +31,8 @@ #include "mongo/base/error_codes.h" #include "mongo/db/global_catalog/chunk_manager.h" +#include "mongo/db/global_catalog/ddl/cluster_ddl.h" +#include "mongo/db/global_catalog/ddl/sharded_ddl_commands_gen.h" #include "mongo/db/router_role/cluster_commands_helpers.h" #include "mongo/db/sharding_environment/grid.h" #include "mongo/db/sharding_environment/mongod_and_mongos_server_parameters_gen.h" @@ -52,6 +54,10 @@ namespace mongo { namespace sharding { namespace router { +namespace { +constexpr size_t kMaxDatabaseCreationAttempts = 3; +} + RouterBase::RouterBase(CatalogCache* catalogCache) : _catalogCache(catalogCache) {} void RouterBase::_initTxnRouterIfNeeded(OperationContext* opCtx) { @@ -93,6 +99,32 @@ CachedDatabaseInfo DBPrimaryRouter::_getRoutingInfo(OperationContext* opCtx) con return uassertStatusOK(_catalogCache->getDatabase(opCtx, _dbName)); } +CachedDatabaseInfo DBPrimaryRouter::_createDbIfRequestedAndGetRoutingInfo( + OperationContext* opCtx) const { + + if (_createDbImplicitly) { + size_t attempts = 0; + while (attempts < kMaxDatabaseCreationAttempts) { + try { + if (attempts > 0) { + cluster::createDatabase(opCtx, _dbName, _suggestedPrimaryId); + } + return _getRoutingInfo(opCtx); + } catch (const ExceptionFor&) { + ++attempts; + LOGV2_INFO(11398601, + "Failed initialization of routing info because the database has been " + "concurrently dropped", + logAttrs(_dbName), + "attemptNumber"_attr = attempts, + "maxAttempts"_attr = kMaxDatabaseCreationAttempts); + } + } + } + + return _getRoutingInfo(opCtx); +} + void DBPrimaryRouter::_onException(OperationContext* opCtx, RoutingRetryInfo* retryInfo, Status s) { if (s == ErrorCodes::StaleDbVersion) { auto si = s.extraInfo(); @@ -274,6 +306,67 @@ CollectionRoutingInfo CollectionRouterCommon::_getRoutingInfo(OperationContext* return uassertStatusOK(_catalogCache->getCollectionRoutingInfo(opCtx, nss, allowLocks)); } +RoutingContext CollectionRouter::_getRoutingContext(OperationContext* opCtx) { + // When in a multi-document transaction, allow getting routing info from the CatalogCache even + // though locks may be held. The CatalogCache will throw CannotRefreshDueToLocksHeld if the + // entry is not already cached. + const auto allowLocks = + opCtx->inMultiDocumentTransaction() && shard_role_details::getLocker(opCtx)->isLocked(); + return RoutingContext(opCtx, _targetedNamespaces, allowLocks); +} + +RoutingContext CollectionRouter::_createDbIfRequestedAndGetRoutingContext(OperationContext* opCtx) { + const NamespaceString& nss = _targetedNamespaces.front(); + + if (_createDbImplicitly) { + size_t attempts = 0; + while (attempts < kMaxDatabaseCreationAttempts) { + try { + if (attempts > 0) { + cluster::createDatabase(opCtx, nss.dbName(), _suggestedPrimaryId); + } + return _getRoutingContext(opCtx); + + } catch (const ExceptionFor&) { + ++attempts; + LOGV2_INFO(11398602, + "Failed initialization of routing info because the database has been " + "concurrently dropped", + logAttrs(nss.dbName()), + "attemptNumber"_attr = attempts, + "maxAttempts"_attr = kMaxDatabaseCreationAttempts); + } + } + } + return _getRoutingContext(opCtx); +} + +CollectionRoutingInfo CollectionRouter::_createDbIfRequestedAndGetRoutingInfo( + OperationContext* opCtx) { + const NamespaceString& nss = _targetedNamespaces.front(); + + if (_createDbImplicitly) { + size_t attempts = 0; + while (attempts < kMaxDatabaseCreationAttempts) { + try { + if (attempts > 0) { + cluster::createDatabase(opCtx, nss.dbName(), _suggestedPrimaryId); + } + return _getRoutingInfo(opCtx, nss); + } catch (const ExceptionFor&) { + ++attempts; + LOGV2_INFO(11398603, + "Failed initialization of routing info because the database has been " + "concurrently dropped", + logAttrs(nss.dbName()), + "attemptNumber"_attr = attempts, + "maxAttempts"_attr = kMaxDatabaseCreationAttempts); + } + } + } + return _getRoutingInfo(opCtx, nss); +} + void CollectionRouterCommon::appendCRUDRoutingTokenToCommand(const ShardId& shardId, const CollectionRoutingInfo& cri, BSONObjBuilder* builder) { diff --git a/src/mongo/db/router_role/router_role.h b/src/mongo/db/router_role/router_role.h index e6fd3c04df1..c08046b16a0 100644 --- a/src/mongo/db/router_role/router_role.h +++ b/src/mongo/db/router_role/router_role.h @@ -82,7 +82,7 @@ public: _initTxnRouterIfNeeded(opCtx); while (true) { - auto cdb = _getRoutingInfo(opCtx); + auto cdb = _createDbIfRequestedAndGetRoutingInfo(opCtx); try { return callbackFn(opCtx, cdb); } catch (const DBException& ex) { @@ -91,6 +91,22 @@ public: } } + /** + * Enables implicit database creation when an operation is routed through this DBPrimaryRouter + * instance. + * Handles concurrent database drops: if the command throws a NamespaceNotFound error because + * the database was concurrently dropped, the database will be recreated and the operation will + * be automatically retried. + * If a suggestedPrimaryId is specified, the database will be created on the given shard only + * if the database doesn't exist yet. If the database is already created, the suggestedPrimaryId + * parameter will be ignored. + */ + void createDbImplicitlyOnRoute( + const boost::optional& suggestedPrimaryId = boost::none) { + _createDbImplicitly = true; + _suggestedPrimaryId = suggestedPrimaryId; + } + static void appendDDLRoutingTokenToCommand(const DatabaseType& dbt, BSONObjBuilder* builder); static void appendCRUDUnshardedRoutingTokenToCommand(const ShardId& shardId, @@ -99,9 +115,13 @@ public: private: CachedDatabaseInfo _getRoutingInfo(OperationContext* opCtx) const; + CachedDatabaseInfo _createDbIfRequestedAndGetRoutingInfo(OperationContext* opCtx) const; void _onException(OperationContext* opCtx, RoutingRetryInfo* retryInfo, Status s); DatabaseName _dbName; + + bool _createDbImplicitly{false}; + boost::optional _suggestedPrimaryId; }; /** @@ -136,7 +156,7 @@ public: template auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) { return _routeImpl(opCtx, comment, [&] { - auto cri = _getRoutingInfo(opCtx, _targetedNamespaces.front()); + auto cri = _createDbIfRequestedAndGetRoutingInfo(opCtx); return callbackFn(opCtx, cri); }); } @@ -144,18 +164,25 @@ public: template auto routeWithRoutingContext(OperationContext* opCtx, StringData comment, F&& callbackFn) { return _routeImpl(opCtx, comment, [&] { - // When in a multi-document transaction, allow getting routing info from the - // CatalogCache even though locks may be held. The CatalogCache will throw - // CannotRefreshDueToLocksHeld if the entry is not already cached. - const auto allowLocks = opCtx->inMultiDocumentTransaction() && - shard_role_details::getLocker(opCtx)->isLocked(); - - RoutingContext routingCtx(opCtx, _targetedNamespaces, allowLocks); + RoutingContext routingCtx = _createDbIfRequestedAndGetRoutingContext(opCtx); return routing_context_utils::runAndValidate( routingCtx, [&](RoutingContext& ctx) { return callbackFn(opCtx, ctx); }); }); } + /** + * Enables implicit database creation when an operation is routed through this CollectionRouter + * instance. + * Handles concurrent database drops: if the command throws a NamespaceNotFound error because + * the database was concurrently dropped, the database will be recreated and the operation will + * be automatically retried. + */ + void createDbImplicitlyOnRoute( + const boost::optional& suggestedPrimaryId = boost::none) { + _createDbImplicitly = true; + _suggestedPrimaryId = suggestedPrimaryId; + } + private: template auto _routeImpl(OperationContext* opCtx, StringData comment, F&& work) { @@ -170,6 +197,14 @@ private: } } } + + RoutingContext _getRoutingContext(OperationContext* opCtx); + + RoutingContext _createDbIfRequestedAndGetRoutingContext(OperationContext* opCtx); + CollectionRoutingInfo _createDbIfRequestedAndGetRoutingInfo(OperationContext* opCtx); + + bool _createDbImplicitly{false}; + boost::optional _suggestedPrimaryId; }; class MONGO_MOD_PUBLIC MultiCollectionRouter final : public CollectionRouterCommon { diff --git a/src/mongo/db/router_role/router_role_test.cpp b/src/mongo/db/router_role/router_role_test.cpp index 0981eb5c7dd..ea2a2068eff 100644 --- a/src/mongo/db/router_role/router_role_test.cpp +++ b/src/mongo/db/router_role/router_role_test.cpp @@ -31,6 +31,7 @@ #include "mongo/base/status.h" #include "mongo/db/commands.h" +#include "mongo/db/global_catalog/ddl/sharded_ddl_commands_gen.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/client_cursor/cursor_response.h" #include "mongo/db/router_role/routing_cache/catalog_cache_test_fixture.h" @@ -69,7 +70,19 @@ public: RouterCatalogCacheTestFixture::tearDown(); } - void mockConfigServerQueries(NamespaceString _nss, OID epoch, Timestamp timestamp) { + void setupReadConcernAtClusterTime(Timestamp clusterTime) { + repl::ReadConcernArgs readConcernArgs; + ASSERT_OK(readConcernArgs.initialize( + BSON("find" << "test" << repl::ReadConcernArgs::kReadConcernFieldName + << BSON(repl::ReadConcernArgs::kAtClusterTimeFieldName + << clusterTime << repl::ReadConcernArgs::kLevelFieldName + << "snapshot")))); + repl::ReadConcernArgs::get(operationContext()) = readConcernArgs; + } + + void mockConfigServerQueriesForCollRefresh(const NamespaceString& nss, + OID epoch, + Timestamp timestamp) { // Mock the expected config server queries. ChunkVersion version({epoch, timestamp}, {2, 0}); ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); @@ -86,7 +99,30 @@ public: version.incMinor(); expectCollectionAndChunksAggregation( - _nss, epoch, timestamp, uuid, shardKeyPattern, {chunk1, chunk2}); + nss, epoch, timestamp, uuid, shardKeyPattern, {chunk1, chunk2}); + } + + void mockConfigServerQueriesForDbRefresh(const DatabaseName& dbName, + const DatabaseVersion& dbVersion) { + expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + DatabaseType db(dbName, {"0"}, dbVersion); + return std::vector{db.toBSON()}; + }()); + } + + void expectCreateDatabase(const DatabaseName& dbName, + const DatabaseVersion& dbVersionToReturn) { + static constexpr auto kCmdName = "_configsvrCreateDatabase"_sd; + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(kConfigHostAndPort, request.target); + ASSERT_TRUE(request.cmdObj.hasField(kCmdName)) + << "Expected command '" << kCmdName << "', but got: " << request.toString(); + ASSERT_EQ(dbName.toString_forTest(), request.cmdObj.getStringField(kCmdName)); + + ConfigsvrCreateDatabaseResponse response; + response.setDatabaseVersion(dbVersionToReturn); + return response.toBSON(); + }); } void setupCatalogCacheWithHistory() { @@ -192,15 +228,6 @@ public: repl::ReadConcernArgs::get(operationContext()) = readConcernArgs; } - void setupReadConcernAtClusterTime(Timestamp clusterTime) { - repl::ReadConcernArgs readConcernArgs; - ASSERT_OK(readConcernArgs.initialize( - BSON("find" << "test" << repl::ReadConcernArgs::kReadConcernFieldName - << BSON(repl::ReadConcernArgs::kAtClusterTimeFieldName - << clusterTime << repl::ReadConcernArgs::kLevelFieldName - << "snapshot")))); - repl::ReadConcernArgs::get(operationContext()) = readConcernArgs; - } private: boost::optional _routerOpCtxSession; @@ -291,6 +318,43 @@ TEST_F(RouterRoleTestTxn, DBPrimaryRouterSetsPlacementConflictTimeIfSubRouter) { }); } +TEST_F(RouterRoleTest, DBPrimaryRouterHappyPath) { + int tries = 0; + + sharding::router::DBPrimaryRouter router(getServiceContext(), _nss.dbName()); + router.route(operationContext(), + "test", + [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) { tries++; }); + + ASSERT_EQ(tries, 1); +} + +TEST_F(RouterRoleTest, DBPrimaryRouterRetriesAfterStaleDb) { + int tries = 0; + + sharding::router::DBPrimaryRouter router(getServiceContext(), _nss.dbName()); + auto future = launchAsync([&] { + router.route( + operationContext(), + "test", + [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) { + tries++; + if (tries == 1) { + uasserted(StaleDbRoutingVersion(_nss.dbName(), + DatabaseVersion(UUID::gen(), Timestamp(1, 0)), + DatabaseVersion(UUID::gen(), Timestamp(2, 0))), + "staleDbVersion"); + } + }); + }); + + mockConfigServerQueriesForDbRefresh(_nss.dbName(), + DatabaseVersion(UUID::gen(), Timestamp(3, 0))); + + future.default_timed_get(); + ASSERT_EQ(tries, 2); +} + TEST_F(RouterRoleTest, DBPrimaryRouterExceedsMaxRetryAttempts) { int maxTestRetries = 10; @@ -307,12 +371,15 @@ TEST_F(RouterRoleTest, DBPrimaryRouterExceedsMaxRetryAttempts) { tries++; uasserted( StaleDbRoutingVersion(_nss.dbName(), - DatabaseVersion(UUID::gen(), Timestamp(0, 0)), - DatabaseVersion(UUID::gen(), Timestamp(0, 0))), + DatabaseVersion(UUID::gen(), Timestamp(1, 0)), + DatabaseVersion(UUID::gen(), Timestamp(2, 0))), "staleDbVersion"); }); }); + mockConfigServerQueriesForDbRefresh(_nss.dbName(), + DatabaseVersion(UUID::gen(), Timestamp(3, 0))); + ASSERT_THROWS_CODE_AND_WHAT( future.default_timed_get(), DBException, @@ -321,6 +388,85 @@ TEST_F(RouterRoleTest, DBPrimaryRouterExceedsMaxRetryAttempts) { ASSERT_EQ(tries, maxTestRetries + 1); } +TEST_F(RouterRoleTest, DBPrimaryRouterImplicitlyCreatesDbWhenSpecified) { + int tries = 0; + + // 1. Route asynchronously a dummy operation with 'createDbImplicitly' set to true. + // The first attempt will fail with a StaleDbVersion to force a db refresh. + sharding::router::DBPrimaryRouter router(getServiceContext(), _nss.dbName()); + router.createDbImplicitlyOnRoute(); + auto future = launchAsync([&] { + router.route( + operationContext(), + "test", + [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) { + tries++; + if (tries == 1) { + uasserted(StaleDbRoutingVersion(_nss.dbName(), + DatabaseVersion(UUID::gen(), Timestamp(1, 0)), + DatabaseVersion(UUID::gen(), Timestamp(2, 0))), + "staleDbVersion"); + } + }); + }); + + // 2. Mock a db drop by return an empty database to force an implicit db creation (need to + // return it twice because for missing databases, the CatalogClient tries twice) + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + + // 3. Mock a db drop again since the createDatabase isn't attempted until the second + // router-role-loop iteration. + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + + // 4. Expect a call to _configsvrCreateDatabase. + expectCreateDatabase(_nss.dbName(), DatabaseVersion(UUID::gen(), Timestamp(3, 0))); + + // 5. Mock the refresh that runs after the db creation. + mockConfigServerQueriesForDbRefresh(_nss.dbName(), + DatabaseVersion(UUID::gen(), Timestamp(3, 0))); + + future.default_timed_get(); + ASSERT_EQ(tries, 2); +} + +TEST_F(RouterRoleTest, DBPrimaryRouterDoesntImplicitlyCreateDbByDefault) { + int tries = 0; + + // 1. Route asynchronously a dummy operation with 'createDbImplicitly' set to true. + // The first attempt will fail with a StaleDbVersion to force a db refresh. + sharding::router::DBPrimaryRouter router(getServiceContext(), _nss.dbName()); + auto future = launchAsync([&] { + router.route( + operationContext(), + "test", + [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) { + tries++; + if (tries == 1) { + uasserted(StaleDbRoutingVersion(_nss.dbName(), + DatabaseVersion(UUID::gen(), Timestamp(1, 0)), + DatabaseVersion(UUID::gen(), Timestamp(2, 0))), + "staleDbVersion"); + } + }); + }); + + // 2. Mock a db drop by return an empty database to force an implicit db creation (need to + // return it twice because for missing databases, the CatalogClient tries twice) + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + + // 3. Expect a NamespaceNotFound error since the database doesn't exist and the implicit db + // creation is disabled. + ASSERT_THROWS_CODE_AND_WHAT(future.default_timed_get(), + DBException, + ErrorCodes::NamespaceNotFound, + str::stream() << "database " << _nss.dbName().toStringForErrorMsg() + << " not found"); + ASSERT_EQ(tries, 1); +} + TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryForSubRouter) { actAsSubRouter(); @@ -347,7 +493,7 @@ TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryForSubRouter) { ASSERT_EQ(tries, 1); } -TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryOnStaleConfigForNonSubRouter) { +TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryOnStaleConfigInTxnForNonSubRouter) { const OID epoch{OID::gen()}; const Timestamp timestamp{1, 0}; @@ -413,7 +559,7 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnStaleConfigWithoutTxn) { } }); }); - mockConfigServerQueries(_nss, epoch, timestamp); + mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp); future.default_timed_get(); ASSERT_EQ(tries, 2); @@ -467,7 +613,7 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnStaleEpochWithoutTxn) { } }); }); - mockConfigServerQueries(_nss, epoch, timestamp); + mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp); future.default_timed_get(); ASSERT_EQ(tries, 2); @@ -486,7 +632,7 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnStaleEpochWithoutTxn) { } }); }); - mockConfigServerQueries(_nss, OID::gen(), Timestamp{2, 0}); + mockConfigServerQueriesForCollRefresh(_nss, OID::gen(), Timestamp{2, 0}); future2.default_timed_get(); ASSERT_EQ(tries, 2); @@ -617,7 +763,7 @@ TEST_F(RouterRoleTest, CollectionRouterWithRoutingContextRetryOnStaleConfigWitho } }); }); - mockConfigServerQueries(_nss, epoch, timestamp); + mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp); future.default_timed_get(); ASSERT_EQ(tries, 2); @@ -681,7 +827,7 @@ TEST_F(RouterRoleTestTxn, CollectionRouterWithRoutingContextAtTransactionCluster testAtTimestamp(Timestamp(1, 0), 2, "0"), DBException, ErrorCodes::StaleChunkHistory); } -TEST_F(RouterRoleTestTxn, CollectionRouterWithRoutingContextAtReadConcernClusterTime) { +TEST_F(RouterRoleTest, CollectionRouterWithRoutingContextAtReadConcernClusterTime) { setupCatalogCacheWithHistory(); // Test routing behavior at different timestamps to verify that catalog cache history works @@ -803,8 +949,8 @@ TEST_F(RouterRoleTest, MultiCollectionRouterRetryOnStaleConfig) { } }); }); - mockConfigServerQueries(nss2, epoch, timestamp); - mockConfigServerQueries(_nss, epoch, timestamp); + mockConfigServerQueriesForCollRefresh(nss2, epoch, timestamp); + mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp); future.default_timed_get(); ASSERT_EQ(tries, 2); @@ -839,7 +985,7 @@ TEST_F(RouterRoleTest, "StaleConfig error"); }); }); - mockConfigServerQueries(nss2, epoch, timestamp); + mockConfigServerQueriesForCollRefresh(nss2, epoch, timestamp); ASSERT_THROWS_CODE(future.default_timed_get(), DBException, ErrorCodes::StaleConfig); ASSERT_EQ(tries, 1); @@ -950,7 +1096,7 @@ TEST_F(RouterRoleTestTxn, RoutingContextRoutingTablesAreImmutable) { // Schedule a refresh with a new placement version. auto future = scheduleRoutingInfoForcedRefresh(_nss); - mockConfigServerQueries(_nss, versionA.epoch(), versionA.getTimestamp()); + mockConfigServerQueriesForCollRefresh(_nss, versionA.epoch(), versionA.getTimestamp()); const auto criB = *future.default_timed_get(); const auto versionB = criB.getCollectionVersion().placementVersion(); @@ -1013,7 +1159,7 @@ TEST_F(RouterRoleTest, CollectionRouterExceedsMaxRetryAttempts) { Timestamp timestamp{startTime, 0}; startTime++; - mockConfigServerQueries(_nss, epoch, timestamp); + mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp); } ASSERT_THROWS_CODE_AND_WHAT(future.default_timed_get(), @@ -1024,7 +1170,7 @@ TEST_F(RouterRoleTest, CollectionRouterExceedsMaxRetryAttempts) { ASSERT_EQ(tries, maxTestRetries + 1); } -TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryOnShardNotFound) { +TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryOnShardNotFoundInTxn) { sharding::router::CollectionRouter router(getServiceContext(), _nss); ASSERT(TransactionRouter::get(operationContext())); @@ -1070,7 +1216,7 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnShardNotFound) { DatabaseType db(_nss.dbName(), {"0"}, DatabaseVersion(UUID::gen(), Timestamp(1, 0))); return std::vector{db.toBSON()}; }()); - mockConfigServerQueries(_nss, epoch, timestamp); + mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp); future.default_timed_get(); ASSERT_EQ(tries, 2); @@ -1112,7 +1258,7 @@ TEST_F(RouterRoleTestTxn, CatalogCacheGetRoutingInfoAtTransactionClusterTime) { testAtTimestamp(Timestamp(1, 0), 2, "0"), DBException, ErrorCodes::StaleChunkHistory); } -TEST_F(RouterRoleTestTxn, CatalogCacheGetRoutingInfoAtReadConcernClusterTime) { +TEST_F(RouterRoleTest, CatalogCacheGetRoutingInfoAtReadConcernClusterTime) { setupCatalogCacheWithHistory(); // Test routing behavior at different timestamps to verify that catalog cache history works @@ -1186,7 +1332,7 @@ TEST_F(RouterRoleTestTxn, CollectionRouterGetRoutingInfoAtTransactionClusterTime testAtTimestamp(Timestamp(1, 0), 2, "0"), DBException, ErrorCodes::StaleChunkHistory); } -TEST_F(RouterRoleTestTxn, CollectionRouterGetRoutingInfoAtReadConcernClusterTime) { +TEST_F(RouterRoleTest, CollectionRouterGetRoutingInfoAtReadConcernClusterTime) { setupCatalogCacheWithHistory(); // Test routing behavior at different timestamps to verify that catalog cache history works @@ -1249,10 +1395,92 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnStaleConfigTimeseriesBucket) { } }); }); - mockConfigServerQueries(bucketNss, epoch, timestamp); + mockConfigServerQueriesForCollRefresh(bucketNss, epoch, timestamp); future.default_timed_get(); ASSERT_EQ(tries, 2); } + +TEST_F(RouterRoleTest, CollectionRouterDoesntImplicitlyCreateDbByDefault) { + sharding::router::CollectionRouter router(getServiceContext(), _nss); + + // 1. Route asynchronously a dummy operation. + // The first attempt will fail with a StaleDbVersion to force a db refresh. + int tries = 0; + auto future = launchAsync([&] { + router.route( + operationContext(), + "test", + [&](OperationContext* opCtx, const CollectionRoutingInfo& cri) { + tries++; + if (tries == 1) { + uasserted(StaleDbRoutingVersion(_nss.dbName(), + DatabaseVersion(UUID::gen(), Timestamp(1, 0)), + DatabaseVersion(UUID::gen(), Timestamp(2, 0))), + "staleDbVersion"); + } + }); + }); + + // 2. Mock a db drop as well by returning an empty database to force an implicit db creation + // (need to return it twice because for missing databases, the CatalogClient tries twice) + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + + // 3. Expect a NamespaceNotFound error since the database doesn't exist and the implicit db + // creation is disabled. + ASSERT_THROWS_CODE_AND_WHAT(future.default_timed_get(), + DBException, + ErrorCodes::NamespaceNotFound, + str::stream() << "database " << _nss.dbName().toStringForErrorMsg() + << " not found"); + ASSERT_EQ(tries, 1); +} + +TEST_F(RouterRoleTest, CollectionRouterImplicitlyCreatesDbWhenSpecified) { + sharding::router::CollectionRouter router(getServiceContext(), _nss); + router.createDbImplicitlyOnRoute(); + + // 1. Route asynchronously a dummy operation with 'createDbImplicitly' set to true. + // The first attempt will fail with a StaleDbVersion to force a db refresh. + int tries = 0; + auto future = launchAsync([&] { + router.route( + operationContext(), + "test", + [&](OperationContext* opCtx, const CollectionRoutingInfo& cri) { + tries++; + if (tries == 1) { + uasserted(StaleDbRoutingVersion(_nss.dbName(), + DatabaseVersion(UUID::gen(), Timestamp(1, 0)), + DatabaseVersion(UUID::gen(), Timestamp(2, 0))), + "staleDbVersion"); + } + }); + }); + + // 2. Mock a db drop as well by returning an empty database to force an implicit db creation + // (need to return it twice because for missing databases, the CatalogClient tries twice) + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + + // 3. Mock a db drop again since the createDatabase isn't attempted until the second + // router-role-loop iteration. + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, {}); + + // 4. Expect a call to _configsvrCreateDatabase. + expectCreateDatabase(_nss.dbName(), DatabaseVersion(UUID::gen(), Timestamp(3, 0))); + + // 5. Mock the refresh that runs after the db creation. + mockConfigServerQueriesForDbRefresh(_nss.dbName(), + DatabaseVersion(UUID::gen(), Timestamp(3, 0))); + mockConfigServerQueriesForCollRefresh(_nss, OID::gen(), Timestamp{1, 0}); + + future.default_timed_get(); + + ASSERT_EQ(tries, 2); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/query_cmd/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/query_cmd/cluster_find_and_modify_cmd.cpp index 48cca5b4d7e..85190afcd40 100644 --- a/src/mongo/s/commands/query_cmd/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/query_cmd/cluster_find_and_modify_cmd.cpp @@ -118,8 +118,6 @@ namespace mongo { namespace { -constexpr size_t kMaxDatabaseCreationAttempts = 3u; - using QuerySamplingOptions = OperationContext::QuerySamplingOptions; const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly); @@ -1018,35 +1016,15 @@ bool FindAndModifyCmd::run(OperationContext* opCtx, } }; - while (true) { - size_t attempts = 1u; - try { - // Technically, findAndModify should only be creating database if upsert is true, but - // this would require that the parsing be pulled into this function. - cluster::createDatabase(opCtx, originalNss.dbName()); + sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss}; - sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss}; - router.routeWithRoutingContext(opCtx, getName(), findAndModifyBody); - return true; + // Technically, findAndModify should only be creating database if upsert is true, but + // this would require that the parsing be pulled into this function. + // TODO (SERVER-114203) - Implicitly create a database only when upsert is true. + router.createDbImplicitlyOnRoute(); + router.routeWithRoutingContext(opCtx, getName(), findAndModifyBody); - } catch (const ExceptionFor&) { - LOGV2_INFO(8584300, - "Failed initialization of routing info because the database has been " - "concurrently dropped", - logAttrs(originalNss.dbName()), - "attemptNumber"_attr = attempts, - "maxAttempts"_attr = kMaxDatabaseCreationAttempts); - - if (attempts++ >= kMaxDatabaseCreationAttempts) { - // The maximum number of attempts has been reached, so the procedure fails as it - // could be a logical error. At this point, it is unlikely that the error is - // caused by concurrent drop database operations. - throw; - } - } - } - - MONGO_UNREACHABLE; + return true; } bool FindAndModifyCmd::getCrudProcessedFromCmd(const BSONObj& cmdObj) { diff --git a/src/mongo/s/commands/query_cmd/cluster_write_cmd.cpp b/src/mongo/s/commands/query_cmd/cluster_write_cmd.cpp index b0bcdeb18dc..40d65668c41 100644 --- a/src/mongo/s/commands/query_cmd/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/query_cmd/cluster_write_cmd.cpp @@ -506,6 +506,12 @@ bool ClusterWriteCmd::runExplainWithoutShardKey(OperationContext* opCtx, } sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss}; + + // Implicitly create the db if it doesn't exist. There is no way right now to return an + // explain on a sharded cluster if the database doesn't exist. + // TODO (SERVER-108882) Stop creating the db once explain can be executed when th db + // doesn't exist. + router.createDbImplicitlyOnRoute(); return router.routeWithRoutingContext( opCtx, "explain write"_sd, @@ -609,99 +615,76 @@ void ClusterWriteCmd::executeWriteOpExplain(OperationContext* opCtx, auto bodyBuilder = result->getBodyBuilder(); const auto originalRequestBSON = req ? req->toBSON() : requestObj; - const size_t kMaxDatabaseCreationAttempts = 3; - size_t attempts = 1; - while (true) { - try { - // Implicitly create the db if it doesn't exist. There is no way right now to return an - // explain on a sharded cluster if the database doesn't exist. - // TODO (SERVER-108882) Stop creating the db once explain can be executed when th db - // doesn't exist. - cluster::createDatabase(opCtx, originalNss.dbName()); - - // If we aren't running an explain for updateOne or deleteOne without shard key, - // continue and run the original explain path. - if (runExplainWithoutShardKey( - opCtx, batchedRequest, originalNss, verbosity, &bodyBuilder)) { - return; - } - - sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss}; - router.routeWithRoutingContext( - opCtx, - "explain write"_sd, - [&](OperationContext* opCtx, RoutingContext& originalRoutingCtx) { - auto translatedReqBSON = originalRequestBSON; - auto translatedNss = originalNss; - const auto targeter = CollectionRoutingInfoTargeter(opCtx, originalNss); - - auto& unusedRoutingCtx = translateNssForRawDataAccordingToRoutingInfo( - opCtx, - originalNss, - targeter, - originalRoutingCtx, - [&](const NamespaceString& bucketsNss) { - translatedNss = bucketsNss; - switch (batchedRequest.getBatchType()) { - case BatchedCommandRequest::BatchType_Insert: - translatedReqBSON = rewriteCommandForRawDataOperation< - write_ops::InsertCommandRequest>(originalRequestBSON, - translatedNss.coll()); - break; - case BatchedCommandRequest::BatchType_Update: - translatedReqBSON = rewriteCommandForRawDataOperation< - write_ops::UpdateCommandRequest>(originalRequestBSON, - translatedNss.coll()); - break; - case BatchedCommandRequest::BatchType_Delete: - translatedReqBSON = rewriteCommandForRawDataOperation< - write_ops::DeleteCommandRequest>(originalRequestBSON, - translatedNss.coll()); - break; - } - }); - unusedRoutingCtx.skipValidation(); - - const auto explainCmd = - ClusterExplain::wrapAsExplain(translatedReqBSON, verbosity); - - // We will time how long it takes to run the commands on the shards. - Timer timer; - - // Target the command to the shards based on the singleton batch item. - BatchItemRef targetingBatchItem(requestPtr, 0); - std::vector shardResponses; - commandOpWrite(opCtx, - translatedNss, - explainCmd, - std::move(targetingBatchItem), - targeter, - &shardResponses); - uassertStatusOK(ClusterExplain::buildExplainResult( - makeBlankExpressionContext(opCtx, translatedNss), - shardResponses, - ClusterExplain::kWriteOnShards, - timer.millis(), - originalRequestBSON, - &bodyBuilder)); - }); - break; - } catch (const ExceptionFor&) { - LOGV2_INFO(10370602, - "Failed initialization of routing info because the database has been " - "concurrently dropped", - logAttrs(originalNss.dbName()), - "attemptNumber"_attr = attempts, - "maxAttempts"_attr = kMaxDatabaseCreationAttempts); - - if (++attempts >= kMaxDatabaseCreationAttempts) { - // The maximum number of attempts has been reached, so the procedure fails as it - // could be a logical error. At this point, it is unlikely that the error is caused - // by concurrent drop database operations. - throw; - } - } + // If we aren't running an explain for updateOne or deleteOne without shard key, + // continue and run the original explain path. + if (runExplainWithoutShardKey(opCtx, batchedRequest, originalNss, verbosity, &bodyBuilder)) { + return; } + + sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss}; + + // Implicitly create the db if it doesn't exist. There is no way right now to return an + // explain on a sharded cluster if the database doesn't exist. + // TODO (SERVER-108882) Stop creating the db once explain can be executed when th db + // doesn't exist. + router.createDbImplicitlyOnRoute(); + router.routeWithRoutingContext( + opCtx, + "explain write"_sd, + [&](OperationContext* opCtx, RoutingContext& originalRoutingCtx) { + auto translatedReqBSON = originalRequestBSON; + auto translatedNss = originalNss; + const auto targeter = CollectionRoutingInfoTargeter(opCtx, originalNss); + + auto& unusedRoutingCtx = translateNssForRawDataAccordingToRoutingInfo( + opCtx, + originalNss, + targeter, + originalRoutingCtx, + [&](const NamespaceString& bucketsNss) { + translatedNss = bucketsNss; + switch (batchedRequest.getBatchType()) { + case BatchedCommandRequest::BatchType_Insert: + translatedReqBSON = + rewriteCommandForRawDataOperation( + originalRequestBSON, translatedNss.coll()); + break; + case BatchedCommandRequest::BatchType_Update: + translatedReqBSON = + rewriteCommandForRawDataOperation( + originalRequestBSON, translatedNss.coll()); + break; + case BatchedCommandRequest::BatchType_Delete: + translatedReqBSON = + rewriteCommandForRawDataOperation( + originalRequestBSON, translatedNss.coll()); + break; + } + }); + unusedRoutingCtx.skipValidation(); + + const auto explainCmd = ClusterExplain::wrapAsExplain(translatedReqBSON, verbosity); + + // We will time how long it takes to run the commands on the shards. + Timer timer; + + // Target the command to the shards based on the singleton batch item. + BatchItemRef targetingBatchItem(requestPtr, 0); + std::vector shardResponses; + commandOpWrite(opCtx, + translatedNss, + explainCmd, + std::move(targetingBatchItem), + targeter, + &shardResponses); + uassertStatusOK( + ClusterExplain::buildExplainResult(makeBlankExpressionContext(opCtx, translatedNss), + shardResponses, + ClusterExplain::kWriteOnShards, + timer.millis(), + originalRequestBSON, + &bodyBuilder)); + }); } bool ClusterWriteCmd::InvocationBase::runImpl(OperationContext* opCtx, diff --git a/src/mongo/s/query/planner/cluster_aggregate.cpp b/src/mongo/s/query/planner/cluster_aggregate.cpp index 2bf5c84ad47..1aa7acef12a 100644 --- a/src/mongo/s/query/planner/cluster_aggregate.cpp +++ b/src/mongo/s/query/planner/cluster_aggregate.cpp @@ -1037,6 +1037,21 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, sharding::router::CollectionRouter router(opCtx->getServiceContext(), namespaces.executionNss); + bool isExplain = verbosity.has_value(); + if (isExplain) { + // Implicitly create the database for explain commands since, right now, there is no way + // to respond properly when the database doesn't exist. + // Before, the database was implicitly created by the CollectionRoutingInfoTargeter class, + // (for context, it's a legacy class to store the routing information), now that we are + // using the RoutingContext instead, we still need to create a database until SERVER-108882 + // gets addressed. + // TODO (SERVER-108882) Stop creating the db once explain can be executed when th db + // doesn't exist. + router.createDbImplicitlyOnRoute(); + } + + // We'll use routerBodyStarted to distinguish whether an error was thrown before or after the + // body function was executed. bool routerBodyStarted = false; auto bodyFn = [&](OperationContext* opCtx, RoutingContext& routingCtx) { routerBodyStarted = true; @@ -1053,33 +1068,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return Status::OK(); }; - const size_t kMaxDatabaseCreationAttempts = 3; - size_t attempts = 1; - Status status{Status::OK()}; - bool isExplain = verbosity.has_value(); - while (true) { - if (isExplain) { - // Implicitly create the database for explain commands since, right now, there is no way - // to respond properly when the database doesn't exist. Before the database was - // implicitly created by CollectionRoutingInfoTargeter, now that we are not using this - // class anymore still need to create a database. - // TODO (SERVER-108882) Stop creating the db once explain can be executed when th db - // doesn't exist. - cluster::createDatabase(opCtx, namespaces.executionNss.dbName()); - } + // Route the command and capture the returned status. + Status status = std::invoke([&]() -> Status { try { - status = router.routeWithRoutingContext(opCtx, comment, bodyFn); - } catch (const ExceptionFor& ex) { - if (isExplain && ++attempts < kMaxDatabaseCreationAttempts) { - continue; - } - status = ex.toStatus(); + return router.routeWithRoutingContext(opCtx, comment, bodyFn); } catch (const DBException& ex) { - status = ex.toStatus(); + return ex.toStatus(); } - break; - } + }); + // Error handling for exceptions raised prior to executing the runAggregation operation. if (!status.isOK() && !routerBodyStarted) { uassert(CollectionUUIDMismatchInfo(request.getDbName(), *request.getCollectionUUID(),