mirror of https://github.com/mongodb/mongo
SERVER-114129 Opt-in feature to implicitly create a database in the RouterRole API (#44173)
GitOrigin-RevId: 114f9c9f6ca8fa0ea3237ee59e91c858eb911e88
This commit is contained in:
parent
2a588d904a
commit
f4e88b9c67
|
|
@ -1390,6 +1390,9 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
|
|||
/jstests/noPassthrough/replication/**/*initial_sync* @10gen/server-initial-sync @svc-auto-approve-bot
|
||||
/jstests/noPassthrough/replication/**/*replication_recovery* @10gen/server-recovery @svc-auto-approve-bot
|
||||
|
||||
# The following patterns are parsed from ./jstests/noPassthrough/routing/OWNERS.yml
|
||||
/jstests/noPassthrough/routing/**/* @10gen/server-catalog-and-routing-routing-and-topology @svc-auto-approve-bot
|
||||
|
||||
# The following patterns are parsed from ./jstests/noPassthrough/rs_endpoint/OWNERS.yml
|
||||
/jstests/noPassthrough/rs_endpoint/**/* @10gen/server-cluster-scalability @svc-auto-approve-bot
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,12 @@
|
|||
load("//bazel:mongo_js_rules.bzl", "all_subpackage_javascript_files", "mongo_js_library")
|
||||
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
mongo_js_library(
|
||||
name = "all_javascript_files",
|
||||
srcs = glob([
|
||||
"*.js",
|
||||
]),
|
||||
)
|
||||
|
||||
all_subpackage_javascript_files()
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
version: 1.0.0
|
||||
filters:
|
||||
- "*":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing-routing-and-topology
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
import {after, before, beforeEach, describe, it} from "jstests/libs/mochalite.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
||||
const dbName = jsTestName();
|
||||
const collName = "testColl";
|
||||
const collName2 = "testColl2";
|
||||
|
||||
describe("Test create collection from stale mongos", function () {
|
||||
before(() => {
|
||||
this.st = new ShardingTest({shards: 1, rs: {nodes: 1}, mongos: 2});
|
||||
|
||||
this.staleMongos = this.st.s0;
|
||||
this.nonStaleMongos = this.st.s1;
|
||||
});
|
||||
after(() => {
|
||||
this.st.stop();
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
// Pre-create the collection via the first router.
|
||||
assert.commandWorked(this.staleMongos.getDB(dbName).createCollection(collName));
|
||||
|
||||
// Drop the database on another router without removing routing info from first router.
|
||||
assert.commandWorked(this.nonStaleMongos.getDB(dbName).dropDatabase());
|
||||
});
|
||||
|
||||
it("Create same collection after dropping database via different router", () => {
|
||||
assert.commandWorked(this.staleMongos.getDB(dbName).createCollection(collName));
|
||||
|
||||
// Verify that `collName` exists.
|
||||
assert.eq(
|
||||
1,
|
||||
this.nonStaleMongos.getDB(dbName).getCollectionInfos({name: collName}).length,
|
||||
"Collection should exist but it doesn't.",
|
||||
);
|
||||
});
|
||||
|
||||
it("Create different collection after dropping database via different router", () => {
|
||||
assert.commandWorked(this.staleMongos.getDB(dbName).createCollection(collName2));
|
||||
|
||||
// Verify that `collName2` exists.
|
||||
assert.eq(
|
||||
1,
|
||||
this.nonStaleMongos.getDB(dbName).getCollectionInfos({name: collName2}).length,
|
||||
"Collection should exist but it doesn't.",
|
||||
);
|
||||
|
||||
// Verify that `collName` doesn't exist.
|
||||
assert.eq(
|
||||
0,
|
||||
this.nonStaleMongos.getDB(dbName).getCollectionInfos({name: collName}).length,
|
||||
"Collection should not exist but it does.",
|
||||
);
|
||||
});
|
||||
|
||||
it("Insert into collection after dropping database via different router", () => {
|
||||
assert.commandWorked(this.staleMongos.getDB(dbName)[collName].insert({_id: 1}));
|
||||
|
||||
// Verify that `collName` exists.
|
||||
assert.eq(
|
||||
1,
|
||||
this.nonStaleMongos.getDB(dbName).getCollectionInfos({name: collName}).length,
|
||||
"Collection should exist but it doesn't.",
|
||||
);
|
||||
});
|
||||
|
||||
it("Shard collection after dropping database via different router", () => {
|
||||
assert.commandWorked(this.staleMongos.adminCommand({shardCollection: dbName + "." + collName, key: {x: 1}}));
|
||||
|
||||
// Verify that `collName` exists.
|
||||
assert.eq(
|
||||
1,
|
||||
this.nonStaleMongos.getDB(dbName).getCollectionInfos({name: collName}).length,
|
||||
"Collection should exist but it doesn't.",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
@ -130,113 +130,87 @@ public:
|
|||
const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj));
|
||||
LOGV2_DEBUG(22750, 1, "CMD: createIndexes", logAttrs(nss), "command"_attr = redact(cmdObj));
|
||||
|
||||
const size_t kMaxDatabaseCreationAttempts = 3;
|
||||
size_t attempts = 1;
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), nss};
|
||||
router.createDbImplicitlyOnRoute();
|
||||
return router.routeWithRoutingContext(
|
||||
opCtx,
|
||||
Request::kCommandName,
|
||||
[&](OperationContext* opCtx, RoutingContext& unusedRoutingCtx) {
|
||||
// The CollectionRouter is not capable of implicitly translate the namespace
|
||||
// to a timeseries buckets collection, which is required in this command.
|
||||
// Hence, we'll use the CollectionRouter to handle StaleConfig errors but
|
||||
// will ignore its RoutingContext. Instead, we'll use a
|
||||
// CollectionRoutingInfoTargeter object to properly get the RoutingContext
|
||||
// when the collection is timeseries.
|
||||
// TODO (SPM-3830) Use the RoutingContext provided by the CollectionRouter
|
||||
// once all timeseries collections become viewless.
|
||||
unusedRoutingCtx.skipValidation();
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
// Implicitly create the db if it doesn't exist
|
||||
cluster::createDatabase(opCtx, nss.dbName());
|
||||
// Clear the `result` BSON builder since this lambda function may be retried
|
||||
// if the router cache is stale.
|
||||
output.resetToEmpty();
|
||||
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), nss};
|
||||
return router.routeWithRoutingContext(
|
||||
opCtx,
|
||||
Request::kCommandName,
|
||||
[&](OperationContext* opCtx, RoutingContext& unusedRoutingCtx) {
|
||||
// The CollectionRouter is not capable of implicitly translate the namespace
|
||||
// to a timeseries buckets collection, which is required in this command.
|
||||
// Hence, we'll use the CollectionRouter to handle StaleConfig errors but
|
||||
// will ignore its RoutingContext. Instead, we'll use a
|
||||
// CollectionRoutingInfoTargeter object to properly get the RoutingContext
|
||||
// when the collection is timeseries.
|
||||
// TODO (SPM-3830) Use the RoutingContext provided by the CollectionRouter
|
||||
// once all timeseries collections become viewless.
|
||||
unusedRoutingCtx.skipValidation();
|
||||
auto targeter = CollectionRoutingInfoTargeter(opCtx, nss);
|
||||
auto routingInfo = targeter.getRoutingInfo();
|
||||
auto cmdToBeSent = cmdObj;
|
||||
|
||||
// Clear the `result` BSON builder since this lambda function may be retried
|
||||
// if the router cache is stale.
|
||||
output.resetToEmpty();
|
||||
if (targeter.timeseriesNamespaceNeedsRewrite(nss)) {
|
||||
const auto& request = requestParser.request();
|
||||
if (auto uuid = request.getCollectionUUID()) {
|
||||
auto status =
|
||||
Status(CollectionUUIDMismatchInfo(
|
||||
nss.dbName(), *uuid, std::string{nss.coll()}, boost::none),
|
||||
"'collectionUUID' is specified for a time-series view "
|
||||
"namespace; views do not have UUIDs");
|
||||
uassertStatusOK(populateCollectionUUIDMismatch(opCtx, status));
|
||||
MONGO_UNREACHABLE_TASSERT(8549600);
|
||||
}
|
||||
|
||||
auto targeter = CollectionRoutingInfoTargeter(opCtx, nss);
|
||||
auto routingInfo = targeter.getRoutingInfo();
|
||||
auto cmdToBeSent = cmdObj;
|
||||
cmdToBeSent = timeseries::makeTimeseriesCommand(
|
||||
cmdToBeSent,
|
||||
nss,
|
||||
getName(),
|
||||
CreateIndexesCommand::kIsTimeseriesNamespaceFieldName);
|
||||
}
|
||||
|
||||
if (targeter.timeseriesNamespaceNeedsRewrite(nss)) {
|
||||
const auto& request = requestParser.request();
|
||||
if (auto uuid = request.getCollectionUUID()) {
|
||||
auto status = Status(
|
||||
CollectionUUIDMismatchInfo(
|
||||
nss.dbName(), *uuid, std::string{nss.coll()}, boost::none),
|
||||
"'collectionUUID' is specified for a time-series view "
|
||||
"namespace; views do not have UUIDs");
|
||||
uassertStatusOK(populateCollectionUUIDMismatch(opCtx, status));
|
||||
MONGO_UNREACHABLE_TASSERT(8549600);
|
||||
}
|
||||
return routing_context_utils::runAndValidate(
|
||||
targeter.getRoutingCtx(), [&](RoutingContext& routingCtx) {
|
||||
auto shardResponses = scatterGatherVersionedTargetByRoutingTable(
|
||||
opCtx,
|
||||
routingCtx,
|
||||
targeter.getNS(),
|
||||
CommandHelpers::filterCommandRequestForPassthrough(
|
||||
applyReadWriteConcern(opCtx, this, cmdToBeSent)),
|
||||
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
|
||||
Shard::RetryPolicy::kIdempotent,
|
||||
BSONObj() /*query*/,
|
||||
BSONObj() /*collation*/,
|
||||
boost::none /*letParameters*/,
|
||||
boost::none /*runtimeConstants*/);
|
||||
|
||||
cmdToBeSent = timeseries::makeTimeseriesCommand(
|
||||
cmdToBeSent,
|
||||
nss,
|
||||
getName(),
|
||||
CreateIndexesCommand::kIsTimeseriesNamespaceFieldName);
|
||||
std::string errmsg;
|
||||
bool allShardsSucceeded =
|
||||
appendRawResponses(
|
||||
opCtx, &errmsg, &output, shardResponses, shardResponses.size() > 1)
|
||||
.responseOK;
|
||||
|
||||
// Append the single shard command result to the top-level output to
|
||||
// ensure parity between replica-set and a single sharded cluster.
|
||||
if (shardResponses.size() == 1 && allShardsSucceeded) {
|
||||
CommandHelpers::filterCommandReplyForPassthrough(
|
||||
shardResponses[0].swResponse.getValue().data, &output);
|
||||
}
|
||||
|
||||
return routing_context_utils::runAndValidate(
|
||||
targeter.getRoutingCtx(), [&](RoutingContext& routingCtx) {
|
||||
auto shardResponses = scatterGatherVersionedTargetByRoutingTable(
|
||||
opCtx,
|
||||
routingCtx,
|
||||
targeter.getNS(),
|
||||
CommandHelpers::filterCommandRequestForPassthrough(
|
||||
applyReadWriteConcern(opCtx, this, cmdToBeSent)),
|
||||
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
|
||||
Shard::RetryPolicy::kIdempotent,
|
||||
BSONObj() /*query*/,
|
||||
BSONObj() /*collation*/,
|
||||
boost::none /*letParameters*/,
|
||||
boost::none /*runtimeConstants*/);
|
||||
CommandHelpers::appendSimpleCommandStatus(
|
||||
output, allShardsSucceeded, errmsg);
|
||||
|
||||
std::string errmsg;
|
||||
bool allShardsSucceeded =
|
||||
appendRawResponses(opCtx,
|
||||
&errmsg,
|
||||
&output,
|
||||
shardResponses,
|
||||
shardResponses.size() > 1)
|
||||
.responseOK;
|
||||
if (allShardsSucceeded) {
|
||||
LOGV2(5706400, "Indexes created", logAttrs(nss));
|
||||
}
|
||||
|
||||
// Append the single shard command result to the top-level output to
|
||||
// ensure parity between replica-set and a single sharded cluster.
|
||||
if (shardResponses.size() == 1 && allShardsSucceeded) {
|
||||
CommandHelpers::filterCommandReplyForPassthrough(
|
||||
shardResponses[0].swResponse.getValue().data, &output);
|
||||
}
|
||||
|
||||
CommandHelpers::appendSimpleCommandStatus(
|
||||
output, allShardsSucceeded, errmsg);
|
||||
|
||||
if (allShardsSucceeded) {
|
||||
LOGV2(5706400, "Indexes created", logAttrs(nss));
|
||||
}
|
||||
|
||||
return allShardsSucceeded;
|
||||
});
|
||||
return allShardsSucceeded;
|
||||
});
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
LOGV2_INFO(10370501,
|
||||
"Failed initialization of routing info because the database has been "
|
||||
"concurrently dropped",
|
||||
logAttrs(nss.dbName()),
|
||||
"attemptNumber"_attr = attempts,
|
||||
"maxAttempts"_attr = kMaxDatabaseCreationAttempts);
|
||||
|
||||
if (attempts++ >= kMaxDatabaseCreationAttempts) {
|
||||
// The maximum number of attempts has been reached, so the procedure fails as it
|
||||
// could be a logical error. At this point, it is unlikely that the error is
|
||||
// caused by concurrent drop database operations.
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -176,8 +176,6 @@ CreateCollectionResponse createCollection(OperationContext* opCtx,
|
|||
"Hanging createCollection due to failpoint 'hangCreateUnshardedCollection' finished");
|
||||
}
|
||||
|
||||
const auto dbInfo = createDatabase(opCtx, nss.dbName());
|
||||
|
||||
// The config.system.session collection can only exist as sharded and it's essential for the
|
||||
// correct cluster functionality. To prevent potential issues, the operation must always be
|
||||
// performed as a sharded creation with internal defaults. Ignore potentially different
|
||||
|
|
@ -268,6 +266,14 @@ CreateCollectionResponse createCollection(OperationContext* opCtx,
|
|||
|
||||
// TODO (SERVER-100309): remove againstFirstShard option once 9.0 becomes last LTS.
|
||||
if (againstFirstShard) {
|
||||
tassert(
|
||||
113986,
|
||||
"createCollection can only run against the first shard for `config.system.sessions` "
|
||||
"collection.",
|
||||
nss == NamespaceString::kLogicalSessionsNamespace);
|
||||
|
||||
const auto dbInfo =
|
||||
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.dbName()));
|
||||
const auto cmdResponse =
|
||||
executeCommandAgainstFirstShard(opCtx,
|
||||
nss.dbName(),
|
||||
|
|
@ -282,6 +288,7 @@ CreateCollectionResponse createCollection(OperationContext* opCtx,
|
|||
|
||||
} else {
|
||||
sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), nss.dbName());
|
||||
router.createDbImplicitlyOnRoute();
|
||||
router.route(
|
||||
opCtx,
|
||||
"createCollection"_sd,
|
||||
|
|
|
|||
|
|
@ -432,12 +432,11 @@ void ShardServerProcessInterface::_createCollectionCommon(OperationContext* opCt
|
|||
const DatabaseName& dbName,
|
||||
const BSONObj& cmdObj,
|
||||
boost::optional<ShardId> dataShard) {
|
||||
cluster::createDatabase(opCtx, dbName, dataShard);
|
||||
|
||||
// TODO (SERVER-77915): Remove the FCV check and keep only the 'else' branch
|
||||
if (!feature_flags::g80CollectionCreationPath.isEnabled(
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), dbName);
|
||||
router.createDbImplicitlyOnRoute(dataShard);
|
||||
router.route(opCtx,
|
||||
"ShardServerProcessInterface::_createCollectionCommon",
|
||||
[&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) {
|
||||
|
|
@ -483,6 +482,7 @@ void ShardServerProcessInterface::_createCollectionCommon(OperationContext* opCt
|
|||
|
||||
shardsvrCollCommand.setShardsvrCreateCollectionRequest(request);
|
||||
sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), dbName);
|
||||
router.createDbImplicitlyOnRoute(dataShard);
|
||||
router.route(opCtx,
|
||||
"ShardServerProcessInterface::_createCollectionCommon",
|
||||
[&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) {
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@
|
|||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/db/global_catalog/chunk_manager.h"
|
||||
#include "mongo/db/global_catalog/ddl/cluster_ddl.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharded_ddl_commands_gen.h"
|
||||
#include "mongo/db/router_role/cluster_commands_helpers.h"
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/db/sharding_environment/mongod_and_mongos_server_parameters_gen.h"
|
||||
|
|
@ -52,6 +54,10 @@ namespace mongo {
|
|||
namespace sharding {
|
||||
namespace router {
|
||||
|
||||
namespace {
|
||||
constexpr size_t kMaxDatabaseCreationAttempts = 3;
|
||||
}
|
||||
|
||||
RouterBase::RouterBase(CatalogCache* catalogCache) : _catalogCache(catalogCache) {}
|
||||
|
||||
void RouterBase::_initTxnRouterIfNeeded(OperationContext* opCtx) {
|
||||
|
|
@ -93,6 +99,32 @@ CachedDatabaseInfo DBPrimaryRouter::_getRoutingInfo(OperationContext* opCtx) con
|
|||
return uassertStatusOK(_catalogCache->getDatabase(opCtx, _dbName));
|
||||
}
|
||||
|
||||
CachedDatabaseInfo DBPrimaryRouter::_createDbIfRequestedAndGetRoutingInfo(
|
||||
OperationContext* opCtx) const {
|
||||
|
||||
if (_createDbImplicitly) {
|
||||
size_t attempts = 0;
|
||||
while (attempts < kMaxDatabaseCreationAttempts) {
|
||||
try {
|
||||
if (attempts > 0) {
|
||||
cluster::createDatabase(opCtx, _dbName, _suggestedPrimaryId);
|
||||
}
|
||||
return _getRoutingInfo(opCtx);
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
++attempts;
|
||||
LOGV2_INFO(11398601,
|
||||
"Failed initialization of routing info because the database has been "
|
||||
"concurrently dropped",
|
||||
logAttrs(_dbName),
|
||||
"attemptNumber"_attr = attempts,
|
||||
"maxAttempts"_attr = kMaxDatabaseCreationAttempts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return _getRoutingInfo(opCtx);
|
||||
}
|
||||
|
||||
void DBPrimaryRouter::_onException(OperationContext* opCtx, RoutingRetryInfo* retryInfo, Status s) {
|
||||
if (s == ErrorCodes::StaleDbVersion) {
|
||||
auto si = s.extraInfo<StaleDbRoutingVersion>();
|
||||
|
|
@ -274,6 +306,67 @@ CollectionRoutingInfo CollectionRouterCommon::_getRoutingInfo(OperationContext*
|
|||
return uassertStatusOK(_catalogCache->getCollectionRoutingInfo(opCtx, nss, allowLocks));
|
||||
}
|
||||
|
||||
RoutingContext CollectionRouter::_getRoutingContext(OperationContext* opCtx) {
|
||||
// When in a multi-document transaction, allow getting routing info from the CatalogCache even
|
||||
// though locks may be held. The CatalogCache will throw CannotRefreshDueToLocksHeld if the
|
||||
// entry is not already cached.
|
||||
const auto allowLocks =
|
||||
opCtx->inMultiDocumentTransaction() && shard_role_details::getLocker(opCtx)->isLocked();
|
||||
return RoutingContext(opCtx, _targetedNamespaces, allowLocks);
|
||||
}
|
||||
|
||||
RoutingContext CollectionRouter::_createDbIfRequestedAndGetRoutingContext(OperationContext* opCtx) {
|
||||
const NamespaceString& nss = _targetedNamespaces.front();
|
||||
|
||||
if (_createDbImplicitly) {
|
||||
size_t attempts = 0;
|
||||
while (attempts < kMaxDatabaseCreationAttempts) {
|
||||
try {
|
||||
if (attempts > 0) {
|
||||
cluster::createDatabase(opCtx, nss.dbName(), _suggestedPrimaryId);
|
||||
}
|
||||
return _getRoutingContext(opCtx);
|
||||
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
++attempts;
|
||||
LOGV2_INFO(11398602,
|
||||
"Failed initialization of routing info because the database has been "
|
||||
"concurrently dropped",
|
||||
logAttrs(nss.dbName()),
|
||||
"attemptNumber"_attr = attempts,
|
||||
"maxAttempts"_attr = kMaxDatabaseCreationAttempts);
|
||||
}
|
||||
}
|
||||
}
|
||||
return _getRoutingContext(opCtx);
|
||||
}
|
||||
|
||||
CollectionRoutingInfo CollectionRouter::_createDbIfRequestedAndGetRoutingInfo(
|
||||
OperationContext* opCtx) {
|
||||
const NamespaceString& nss = _targetedNamespaces.front();
|
||||
|
||||
if (_createDbImplicitly) {
|
||||
size_t attempts = 0;
|
||||
while (attempts < kMaxDatabaseCreationAttempts) {
|
||||
try {
|
||||
if (attempts > 0) {
|
||||
cluster::createDatabase(opCtx, nss.dbName(), _suggestedPrimaryId);
|
||||
}
|
||||
return _getRoutingInfo(opCtx, nss);
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
++attempts;
|
||||
LOGV2_INFO(11398603,
|
||||
"Failed initialization of routing info because the database has been "
|
||||
"concurrently dropped",
|
||||
logAttrs(nss.dbName()),
|
||||
"attemptNumber"_attr = attempts,
|
||||
"maxAttempts"_attr = kMaxDatabaseCreationAttempts);
|
||||
}
|
||||
}
|
||||
}
|
||||
return _getRoutingInfo(opCtx, nss);
|
||||
}
|
||||
|
||||
void CollectionRouterCommon::appendCRUDRoutingTokenToCommand(const ShardId& shardId,
|
||||
const CollectionRoutingInfo& cri,
|
||||
BSONObjBuilder* builder) {
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ public:
|
|||
_initTxnRouterIfNeeded(opCtx);
|
||||
|
||||
while (true) {
|
||||
auto cdb = _getRoutingInfo(opCtx);
|
||||
auto cdb = _createDbIfRequestedAndGetRoutingInfo(opCtx);
|
||||
try {
|
||||
return callbackFn(opCtx, cdb);
|
||||
} catch (const DBException& ex) {
|
||||
|
|
@ -91,6 +91,22 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables implicit database creation when an operation is routed through this DBPrimaryRouter
|
||||
* instance.
|
||||
* Handles concurrent database drops: if the command throws a NamespaceNotFound error because
|
||||
* the database was concurrently dropped, the database will be recreated and the operation will
|
||||
* be automatically retried.
|
||||
* If a suggestedPrimaryId is specified, the database will be created on the given shard only
|
||||
* if the database doesn't exist yet. If the database is already created, the suggestedPrimaryId
|
||||
* parameter will be ignored.
|
||||
*/
|
||||
void createDbImplicitlyOnRoute(
|
||||
const boost::optional<ShardId>& suggestedPrimaryId = boost::none) {
|
||||
_createDbImplicitly = true;
|
||||
_suggestedPrimaryId = suggestedPrimaryId;
|
||||
}
|
||||
|
||||
static void appendDDLRoutingTokenToCommand(const DatabaseType& dbt, BSONObjBuilder* builder);
|
||||
|
||||
static void appendCRUDUnshardedRoutingTokenToCommand(const ShardId& shardId,
|
||||
|
|
@ -99,9 +115,13 @@ public:
|
|||
|
||||
private:
|
||||
CachedDatabaseInfo _getRoutingInfo(OperationContext* opCtx) const;
|
||||
CachedDatabaseInfo _createDbIfRequestedAndGetRoutingInfo(OperationContext* opCtx) const;
|
||||
void _onException(OperationContext* opCtx, RoutingRetryInfo* retryInfo, Status s);
|
||||
|
||||
DatabaseName _dbName;
|
||||
|
||||
bool _createDbImplicitly{false};
|
||||
boost::optional<ShardId> _suggestedPrimaryId;
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
@ -136,7 +156,7 @@ public:
|
|||
template <typename F>
|
||||
auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) {
|
||||
return _routeImpl(opCtx, comment, [&] {
|
||||
auto cri = _getRoutingInfo(opCtx, _targetedNamespaces.front());
|
||||
auto cri = _createDbIfRequestedAndGetRoutingInfo(opCtx);
|
||||
return callbackFn(opCtx, cri);
|
||||
});
|
||||
}
|
||||
|
|
@ -144,18 +164,25 @@ public:
|
|||
template <typename F>
|
||||
auto routeWithRoutingContext(OperationContext* opCtx, StringData comment, F&& callbackFn) {
|
||||
return _routeImpl(opCtx, comment, [&] {
|
||||
// When in a multi-document transaction, allow getting routing info from the
|
||||
// CatalogCache even though locks may be held. The CatalogCache will throw
|
||||
// CannotRefreshDueToLocksHeld if the entry is not already cached.
|
||||
const auto allowLocks = opCtx->inMultiDocumentTransaction() &&
|
||||
shard_role_details::getLocker(opCtx)->isLocked();
|
||||
|
||||
RoutingContext routingCtx(opCtx, _targetedNamespaces, allowLocks);
|
||||
RoutingContext routingCtx = _createDbIfRequestedAndGetRoutingContext(opCtx);
|
||||
return routing_context_utils::runAndValidate(
|
||||
routingCtx, [&](RoutingContext& ctx) { return callbackFn(opCtx, ctx); });
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables implicit database creation when an operation is routed through this CollectionRouter
|
||||
* instance.
|
||||
* Handles concurrent database drops: if the command throws a NamespaceNotFound error because
|
||||
* the database was concurrently dropped, the database will be recreated and the operation will
|
||||
* be automatically retried.
|
||||
*/
|
||||
void createDbImplicitlyOnRoute(
|
||||
const boost::optional<ShardId>& suggestedPrimaryId = boost::none) {
|
||||
_createDbImplicitly = true;
|
||||
_suggestedPrimaryId = suggestedPrimaryId;
|
||||
}
|
||||
|
||||
private:
|
||||
template <typename F>
|
||||
auto _routeImpl(OperationContext* opCtx, StringData comment, F&& work) {
|
||||
|
|
@ -170,6 +197,14 @@ private:
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
RoutingContext _getRoutingContext(OperationContext* opCtx);
|
||||
|
||||
RoutingContext _createDbIfRequestedAndGetRoutingContext(OperationContext* opCtx);
|
||||
CollectionRoutingInfo _createDbIfRequestedAndGetRoutingInfo(OperationContext* opCtx);
|
||||
|
||||
bool _createDbImplicitly{false};
|
||||
boost::optional<ShardId> _suggestedPrimaryId;
|
||||
};
|
||||
|
||||
class MONGO_MOD_PUBLIC MultiCollectionRouter final : public CollectionRouterCommon {
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharded_ddl_commands_gen.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/query/client_cursor/cursor_response.h"
|
||||
#include "mongo/db/router_role/routing_cache/catalog_cache_test_fixture.h"
|
||||
|
|
@ -69,7 +70,19 @@ public:
|
|||
RouterCatalogCacheTestFixture::tearDown();
|
||||
}
|
||||
|
||||
void mockConfigServerQueries(NamespaceString _nss, OID epoch, Timestamp timestamp) {
|
||||
void setupReadConcernAtClusterTime(Timestamp clusterTime) {
|
||||
repl::ReadConcernArgs readConcernArgs;
|
||||
ASSERT_OK(readConcernArgs.initialize(
|
||||
BSON("find" << "test" << repl::ReadConcernArgs::kReadConcernFieldName
|
||||
<< BSON(repl::ReadConcernArgs::kAtClusterTimeFieldName
|
||||
<< clusterTime << repl::ReadConcernArgs::kLevelFieldName
|
||||
<< "snapshot"))));
|
||||
repl::ReadConcernArgs::get(operationContext()) = readConcernArgs;
|
||||
}
|
||||
|
||||
void mockConfigServerQueriesForCollRefresh(const NamespaceString& nss,
|
||||
OID epoch,
|
||||
Timestamp timestamp) {
|
||||
// Mock the expected config server queries.
|
||||
ChunkVersion version({epoch, timestamp}, {2, 0});
|
||||
ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
|
||||
|
|
@ -86,7 +99,30 @@ public:
|
|||
version.incMinor();
|
||||
|
||||
expectCollectionAndChunksAggregation(
|
||||
_nss, epoch, timestamp, uuid, shardKeyPattern, {chunk1, chunk2});
|
||||
nss, epoch, timestamp, uuid, shardKeyPattern, {chunk1, chunk2});
|
||||
}
|
||||
|
||||
void mockConfigServerQueriesForDbRefresh(const DatabaseName& dbName,
|
||||
const DatabaseVersion& dbVersion) {
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
|
||||
DatabaseType db(dbName, {"0"}, dbVersion);
|
||||
return std::vector<BSONObj>{db.toBSON()};
|
||||
}());
|
||||
}
|
||||
|
||||
void expectCreateDatabase(const DatabaseName& dbName,
|
||||
const DatabaseVersion& dbVersionToReturn) {
|
||||
static constexpr auto kCmdName = "_configsvrCreateDatabase"_sd;
|
||||
onCommand([&](const executor::RemoteCommandRequest& request) {
|
||||
ASSERT_EQ(kConfigHostAndPort, request.target);
|
||||
ASSERT_TRUE(request.cmdObj.hasField(kCmdName))
|
||||
<< "Expected command '" << kCmdName << "', but got: " << request.toString();
|
||||
ASSERT_EQ(dbName.toString_forTest(), request.cmdObj.getStringField(kCmdName));
|
||||
|
||||
ConfigsvrCreateDatabaseResponse response;
|
||||
response.setDatabaseVersion(dbVersionToReturn);
|
||||
return response.toBSON();
|
||||
});
|
||||
}
|
||||
|
||||
void setupCatalogCacheWithHistory() {
|
||||
|
|
@ -192,15 +228,6 @@ public:
|
|||
repl::ReadConcernArgs::get(operationContext()) = readConcernArgs;
|
||||
}
|
||||
|
||||
void setupReadConcernAtClusterTime(Timestamp clusterTime) {
|
||||
repl::ReadConcernArgs readConcernArgs;
|
||||
ASSERT_OK(readConcernArgs.initialize(
|
||||
BSON("find" << "test" << repl::ReadConcernArgs::kReadConcernFieldName
|
||||
<< BSON(repl::ReadConcernArgs::kAtClusterTimeFieldName
|
||||
<< clusterTime << repl::ReadConcernArgs::kLevelFieldName
|
||||
<< "snapshot"))));
|
||||
repl::ReadConcernArgs::get(operationContext()) = readConcernArgs;
|
||||
}
|
||||
|
||||
private:
|
||||
boost::optional<RouterOperationContextSession> _routerOpCtxSession;
|
||||
|
|
@ -291,6 +318,43 @@ TEST_F(RouterRoleTestTxn, DBPrimaryRouterSetsPlacementConflictTimeIfSubRouter) {
|
|||
});
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTest, DBPrimaryRouterHappyPath) {
|
||||
int tries = 0;
|
||||
|
||||
sharding::router::DBPrimaryRouter router(getServiceContext(), _nss.dbName());
|
||||
router.route(operationContext(),
|
||||
"test",
|
||||
[&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) { tries++; });
|
||||
|
||||
ASSERT_EQ(tries, 1);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTest, DBPrimaryRouterRetriesAfterStaleDb) {
|
||||
int tries = 0;
|
||||
|
||||
sharding::router::DBPrimaryRouter router(getServiceContext(), _nss.dbName());
|
||||
auto future = launchAsync([&] {
|
||||
router.route(
|
||||
operationContext(),
|
||||
"test",
|
||||
[&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) {
|
||||
tries++;
|
||||
if (tries == 1) {
|
||||
uasserted(StaleDbRoutingVersion(_nss.dbName(),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(1, 0)),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(2, 0))),
|
||||
"staleDbVersion");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
mockConfigServerQueriesForDbRefresh(_nss.dbName(),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(3, 0)));
|
||||
|
||||
future.default_timed_get();
|
||||
ASSERT_EQ(tries, 2);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTest, DBPrimaryRouterExceedsMaxRetryAttempts) {
|
||||
int maxTestRetries = 10;
|
||||
|
||||
|
|
@ -307,12 +371,15 @@ TEST_F(RouterRoleTest, DBPrimaryRouterExceedsMaxRetryAttempts) {
|
|||
tries++;
|
||||
uasserted(
|
||||
StaleDbRoutingVersion(_nss.dbName(),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(0, 0)),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(0, 0))),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(1, 0)),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(2, 0))),
|
||||
"staleDbVersion");
|
||||
});
|
||||
});
|
||||
|
||||
mockConfigServerQueriesForDbRefresh(_nss.dbName(),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(3, 0)));
|
||||
|
||||
ASSERT_THROWS_CODE_AND_WHAT(
|
||||
future.default_timed_get(),
|
||||
DBException,
|
||||
|
|
@ -321,6 +388,85 @@ TEST_F(RouterRoleTest, DBPrimaryRouterExceedsMaxRetryAttempts) {
|
|||
ASSERT_EQ(tries, maxTestRetries + 1);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTest, DBPrimaryRouterImplicitlyCreatesDbWhenSpecified) {
|
||||
int tries = 0;
|
||||
|
||||
// 1. Route asynchronously a dummy operation with 'createDbImplicitly' set to true.
|
||||
// The first attempt will fail with a StaleDbVersion to force a db refresh.
|
||||
sharding::router::DBPrimaryRouter router(getServiceContext(), _nss.dbName());
|
||||
router.createDbImplicitlyOnRoute();
|
||||
auto future = launchAsync([&] {
|
||||
router.route(
|
||||
operationContext(),
|
||||
"test",
|
||||
[&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) {
|
||||
tries++;
|
||||
if (tries == 1) {
|
||||
uasserted(StaleDbRoutingVersion(_nss.dbName(),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(1, 0)),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(2, 0))),
|
||||
"staleDbVersion");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// 2. Mock a db drop by return an empty database to force an implicit db creation (need to
|
||||
// return it twice because for missing databases, the CatalogClient tries twice)
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
|
||||
// 3. Mock a db drop again since the createDatabase isn't attempted until the second
|
||||
// router-role-loop iteration.
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
|
||||
// 4. Expect a call to _configsvrCreateDatabase.
|
||||
expectCreateDatabase(_nss.dbName(), DatabaseVersion(UUID::gen(), Timestamp(3, 0)));
|
||||
|
||||
// 5. Mock the refresh that runs after the db creation.
|
||||
mockConfigServerQueriesForDbRefresh(_nss.dbName(),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(3, 0)));
|
||||
|
||||
future.default_timed_get();
|
||||
ASSERT_EQ(tries, 2);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTest, DBPrimaryRouterDoesntImplicitlyCreateDbByDefault) {
|
||||
int tries = 0;
|
||||
|
||||
// 1. Route asynchronously a dummy operation with 'createDbImplicitly' set to true.
|
||||
// The first attempt will fail with a StaleDbVersion to force a db refresh.
|
||||
sharding::router::DBPrimaryRouter router(getServiceContext(), _nss.dbName());
|
||||
auto future = launchAsync([&] {
|
||||
router.route(
|
||||
operationContext(),
|
||||
"test",
|
||||
[&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) {
|
||||
tries++;
|
||||
if (tries == 1) {
|
||||
uasserted(StaleDbRoutingVersion(_nss.dbName(),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(1, 0)),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(2, 0))),
|
||||
"staleDbVersion");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// 2. Mock a db drop by return an empty database to force an implicit db creation (need to
|
||||
// return it twice because for missing databases, the CatalogClient tries twice)
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
|
||||
// 3. Expect a NamespaceNotFound error since the database doesn't exist and the implicit db
|
||||
// creation is disabled.
|
||||
ASSERT_THROWS_CODE_AND_WHAT(future.default_timed_get(),
|
||||
DBException,
|
||||
ErrorCodes::NamespaceNotFound,
|
||||
str::stream() << "database " << _nss.dbName().toStringForErrorMsg()
|
||||
<< " not found");
|
||||
ASSERT_EQ(tries, 1);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryForSubRouter) {
|
||||
actAsSubRouter();
|
||||
|
||||
|
|
@ -347,7 +493,7 @@ TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryForSubRouter) {
|
|||
ASSERT_EQ(tries, 1);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryOnStaleConfigForNonSubRouter) {
|
||||
TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryOnStaleConfigInTxnForNonSubRouter) {
|
||||
const OID epoch{OID::gen()};
|
||||
const Timestamp timestamp{1, 0};
|
||||
|
||||
|
|
@ -413,7 +559,7 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnStaleConfigWithoutTxn) {
|
|||
}
|
||||
});
|
||||
});
|
||||
mockConfigServerQueries(_nss, epoch, timestamp);
|
||||
mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp);
|
||||
future.default_timed_get();
|
||||
|
||||
ASSERT_EQ(tries, 2);
|
||||
|
|
@ -467,7 +613,7 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnStaleEpochWithoutTxn) {
|
|||
}
|
||||
});
|
||||
});
|
||||
mockConfigServerQueries(_nss, epoch, timestamp);
|
||||
mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp);
|
||||
future.default_timed_get();
|
||||
ASSERT_EQ(tries, 2);
|
||||
|
||||
|
|
@ -486,7 +632,7 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnStaleEpochWithoutTxn) {
|
|||
}
|
||||
});
|
||||
});
|
||||
mockConfigServerQueries(_nss, OID::gen(), Timestamp{2, 0});
|
||||
mockConfigServerQueriesForCollRefresh(_nss, OID::gen(), Timestamp{2, 0});
|
||||
future2.default_timed_get();
|
||||
ASSERT_EQ(tries, 2);
|
||||
|
||||
|
|
@ -617,7 +763,7 @@ TEST_F(RouterRoleTest, CollectionRouterWithRoutingContextRetryOnStaleConfigWitho
|
|||
}
|
||||
});
|
||||
});
|
||||
mockConfigServerQueries(_nss, epoch, timestamp);
|
||||
mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp);
|
||||
future.default_timed_get();
|
||||
|
||||
ASSERT_EQ(tries, 2);
|
||||
|
|
@ -681,7 +827,7 @@ TEST_F(RouterRoleTestTxn, CollectionRouterWithRoutingContextAtTransactionCluster
|
|||
testAtTimestamp(Timestamp(1, 0), 2, "0"), DBException, ErrorCodes::StaleChunkHistory);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTestTxn, CollectionRouterWithRoutingContextAtReadConcernClusterTime) {
|
||||
TEST_F(RouterRoleTest, CollectionRouterWithRoutingContextAtReadConcernClusterTime) {
|
||||
setupCatalogCacheWithHistory();
|
||||
|
||||
// Test routing behavior at different timestamps to verify that catalog cache history works
|
||||
|
|
@ -803,8 +949,8 @@ TEST_F(RouterRoleTest, MultiCollectionRouterRetryOnStaleConfig) {
|
|||
}
|
||||
});
|
||||
});
|
||||
mockConfigServerQueries(nss2, epoch, timestamp);
|
||||
mockConfigServerQueries(_nss, epoch, timestamp);
|
||||
mockConfigServerQueriesForCollRefresh(nss2, epoch, timestamp);
|
||||
mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp);
|
||||
future.default_timed_get();
|
||||
|
||||
ASSERT_EQ(tries, 2);
|
||||
|
|
@ -839,7 +985,7 @@ TEST_F(RouterRoleTest,
|
|||
"StaleConfig error");
|
||||
});
|
||||
});
|
||||
mockConfigServerQueries(nss2, epoch, timestamp);
|
||||
mockConfigServerQueriesForCollRefresh(nss2, epoch, timestamp);
|
||||
ASSERT_THROWS_CODE(future.default_timed_get(), DBException, ErrorCodes::StaleConfig);
|
||||
|
||||
ASSERT_EQ(tries, 1);
|
||||
|
|
@ -950,7 +1096,7 @@ TEST_F(RouterRoleTestTxn, RoutingContextRoutingTablesAreImmutable) {
|
|||
// Schedule a refresh with a new placement version.
|
||||
auto future = scheduleRoutingInfoForcedRefresh(_nss);
|
||||
|
||||
mockConfigServerQueries(_nss, versionA.epoch(), versionA.getTimestamp());
|
||||
mockConfigServerQueriesForCollRefresh(_nss, versionA.epoch(), versionA.getTimestamp());
|
||||
|
||||
const auto criB = *future.default_timed_get();
|
||||
const auto versionB = criB.getCollectionVersion().placementVersion();
|
||||
|
|
@ -1013,7 +1159,7 @@ TEST_F(RouterRoleTest, CollectionRouterExceedsMaxRetryAttempts) {
|
|||
Timestamp timestamp{startTime, 0};
|
||||
startTime++;
|
||||
|
||||
mockConfigServerQueries(_nss, epoch, timestamp);
|
||||
mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp);
|
||||
}
|
||||
|
||||
ASSERT_THROWS_CODE_AND_WHAT(future.default_timed_get(),
|
||||
|
|
@ -1024,7 +1170,7 @@ TEST_F(RouterRoleTest, CollectionRouterExceedsMaxRetryAttempts) {
|
|||
ASSERT_EQ(tries, maxTestRetries + 1);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryOnShardNotFound) {
|
||||
TEST_F(RouterRoleTestTxn, CollectionRouterDoesNotRetryOnShardNotFoundInTxn) {
|
||||
sharding::router::CollectionRouter router(getServiceContext(), _nss);
|
||||
ASSERT(TransactionRouter::get(operationContext()));
|
||||
|
||||
|
|
@ -1070,7 +1216,7 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnShardNotFound) {
|
|||
DatabaseType db(_nss.dbName(), {"0"}, DatabaseVersion(UUID::gen(), Timestamp(1, 0)));
|
||||
return std::vector<BSONObj>{db.toBSON()};
|
||||
}());
|
||||
mockConfigServerQueries(_nss, epoch, timestamp);
|
||||
mockConfigServerQueriesForCollRefresh(_nss, epoch, timestamp);
|
||||
future.default_timed_get();
|
||||
|
||||
ASSERT_EQ(tries, 2);
|
||||
|
|
@ -1112,7 +1258,7 @@ TEST_F(RouterRoleTestTxn, CatalogCacheGetRoutingInfoAtTransactionClusterTime) {
|
|||
testAtTimestamp(Timestamp(1, 0), 2, "0"), DBException, ErrorCodes::StaleChunkHistory);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTestTxn, CatalogCacheGetRoutingInfoAtReadConcernClusterTime) {
|
||||
TEST_F(RouterRoleTest, CatalogCacheGetRoutingInfoAtReadConcernClusterTime) {
|
||||
setupCatalogCacheWithHistory();
|
||||
|
||||
// Test routing behavior at different timestamps to verify that catalog cache history works
|
||||
|
|
@ -1186,7 +1332,7 @@ TEST_F(RouterRoleTestTxn, CollectionRouterGetRoutingInfoAtTransactionClusterTime
|
|||
testAtTimestamp(Timestamp(1, 0), 2, "0"), DBException, ErrorCodes::StaleChunkHistory);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTestTxn, CollectionRouterGetRoutingInfoAtReadConcernClusterTime) {
|
||||
TEST_F(RouterRoleTest, CollectionRouterGetRoutingInfoAtReadConcernClusterTime) {
|
||||
setupCatalogCacheWithHistory();
|
||||
|
||||
// Test routing behavior at different timestamps to verify that catalog cache history works
|
||||
|
|
@ -1249,10 +1395,92 @@ TEST_F(RouterRoleTest, CollectionRouterRetryOnStaleConfigTimeseriesBucket) {
|
|||
}
|
||||
});
|
||||
});
|
||||
mockConfigServerQueries(bucketNss, epoch, timestamp);
|
||||
mockConfigServerQueriesForCollRefresh(bucketNss, epoch, timestamp);
|
||||
future.default_timed_get();
|
||||
|
||||
ASSERT_EQ(tries, 2);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTest, CollectionRouterDoesntImplicitlyCreateDbByDefault) {
|
||||
sharding::router::CollectionRouter router(getServiceContext(), _nss);
|
||||
|
||||
// 1. Route asynchronously a dummy operation.
|
||||
// The first attempt will fail with a StaleDbVersion to force a db refresh.
|
||||
int tries = 0;
|
||||
auto future = launchAsync([&] {
|
||||
router.route(
|
||||
operationContext(),
|
||||
"test",
|
||||
[&](OperationContext* opCtx, const CollectionRoutingInfo& cri) {
|
||||
tries++;
|
||||
if (tries == 1) {
|
||||
uasserted(StaleDbRoutingVersion(_nss.dbName(),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(1, 0)),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(2, 0))),
|
||||
"staleDbVersion");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// 2. Mock a db drop as well by returning an empty database to force an implicit db creation
|
||||
// (need to return it twice because for missing databases, the CatalogClient tries twice)
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
|
||||
// 3. Expect a NamespaceNotFound error since the database doesn't exist and the implicit db
|
||||
// creation is disabled.
|
||||
ASSERT_THROWS_CODE_AND_WHAT(future.default_timed_get(),
|
||||
DBException,
|
||||
ErrorCodes::NamespaceNotFound,
|
||||
str::stream() << "database " << _nss.dbName().toStringForErrorMsg()
|
||||
<< " not found");
|
||||
ASSERT_EQ(tries, 1);
|
||||
}
|
||||
|
||||
TEST_F(RouterRoleTest, CollectionRouterImplicitlyCreatesDbWhenSpecified) {
|
||||
sharding::router::CollectionRouter router(getServiceContext(), _nss);
|
||||
router.createDbImplicitlyOnRoute();
|
||||
|
||||
// 1. Route asynchronously a dummy operation with 'createDbImplicitly' set to true.
|
||||
// The first attempt will fail with a StaleDbVersion to force a db refresh.
|
||||
int tries = 0;
|
||||
auto future = launchAsync([&] {
|
||||
router.route(
|
||||
operationContext(),
|
||||
"test",
|
||||
[&](OperationContext* opCtx, const CollectionRoutingInfo& cri) {
|
||||
tries++;
|
||||
if (tries == 1) {
|
||||
uasserted(StaleDbRoutingVersion(_nss.dbName(),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(1, 0)),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(2, 0))),
|
||||
"staleDbVersion");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// 2. Mock a db drop as well by returning an empty database to force an implicit db creation
|
||||
// (need to return it twice because for missing databases, the CatalogClient tries twice)
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
|
||||
// 3. Mock a db drop again since the createDatabase isn't attempted until the second
|
||||
// router-role-loop iteration.
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
expectFindSendBSONObjVector(kConfigHostAndPort, {});
|
||||
|
||||
// 4. Expect a call to _configsvrCreateDatabase.
|
||||
expectCreateDatabase(_nss.dbName(), DatabaseVersion(UUID::gen(), Timestamp(3, 0)));
|
||||
|
||||
// 5. Mock the refresh that runs after the db creation.
|
||||
mockConfigServerQueriesForDbRefresh(_nss.dbName(),
|
||||
DatabaseVersion(UUID::gen(), Timestamp(3, 0)));
|
||||
mockConfigServerQueriesForCollRefresh(_nss, OID::gen(), Timestamp{1, 0});
|
||||
|
||||
future.default_timed_get();
|
||||
|
||||
ASSERT_EQ(tries, 2);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
|
|
|
|||
|
|
@ -118,8 +118,6 @@
|
|||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
constexpr size_t kMaxDatabaseCreationAttempts = 3u;
|
||||
|
||||
using QuerySamplingOptions = OperationContext::QuerySamplingOptions;
|
||||
|
||||
const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly);
|
||||
|
|
@ -1018,35 +1016,15 @@ bool FindAndModifyCmd::run(OperationContext* opCtx,
|
|||
}
|
||||
};
|
||||
|
||||
while (true) {
|
||||
size_t attempts = 1u;
|
||||
try {
|
||||
// Technically, findAndModify should only be creating database if upsert is true, but
|
||||
// this would require that the parsing be pulled into this function.
|
||||
cluster::createDatabase(opCtx, originalNss.dbName());
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss};
|
||||
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss};
|
||||
router.routeWithRoutingContext(opCtx, getName(), findAndModifyBody);
|
||||
return true;
|
||||
// Technically, findAndModify should only be creating database if upsert is true, but
|
||||
// this would require that the parsing be pulled into this function.
|
||||
// TODO (SERVER-114203) - Implicitly create a database only when upsert is true.
|
||||
router.createDbImplicitlyOnRoute();
|
||||
router.routeWithRoutingContext(opCtx, getName(), findAndModifyBody);
|
||||
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
LOGV2_INFO(8584300,
|
||||
"Failed initialization of routing info because the database has been "
|
||||
"concurrently dropped",
|
||||
logAttrs(originalNss.dbName()),
|
||||
"attemptNumber"_attr = attempts,
|
||||
"maxAttempts"_attr = kMaxDatabaseCreationAttempts);
|
||||
|
||||
if (attempts++ >= kMaxDatabaseCreationAttempts) {
|
||||
// The maximum number of attempts has been reached, so the procedure fails as it
|
||||
// could be a logical error. At this point, it is unlikely that the error is
|
||||
// caused by concurrent drop database operations.
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MONGO_UNREACHABLE;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool FindAndModifyCmd::getCrudProcessedFromCmd(const BSONObj& cmdObj) {
|
||||
|
|
|
|||
|
|
@ -506,6 +506,12 @@ bool ClusterWriteCmd::runExplainWithoutShardKey(OperationContext* opCtx,
|
|||
}
|
||||
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss};
|
||||
|
||||
// Implicitly create the db if it doesn't exist. There is no way right now to return an
|
||||
// explain on a sharded cluster if the database doesn't exist.
|
||||
// TODO (SERVER-108882) Stop creating the db once explain can be executed when th db
|
||||
// doesn't exist.
|
||||
router.createDbImplicitlyOnRoute();
|
||||
return router.routeWithRoutingContext(
|
||||
opCtx,
|
||||
"explain write"_sd,
|
||||
|
|
@ -609,99 +615,76 @@ void ClusterWriteCmd::executeWriteOpExplain(OperationContext* opCtx,
|
|||
auto bodyBuilder = result->getBodyBuilder();
|
||||
const auto originalRequestBSON = req ? req->toBSON() : requestObj;
|
||||
|
||||
const size_t kMaxDatabaseCreationAttempts = 3;
|
||||
size_t attempts = 1;
|
||||
while (true) {
|
||||
try {
|
||||
// Implicitly create the db if it doesn't exist. There is no way right now to return an
|
||||
// explain on a sharded cluster if the database doesn't exist.
|
||||
// TODO (SERVER-108882) Stop creating the db once explain can be executed when th db
|
||||
// doesn't exist.
|
||||
cluster::createDatabase(opCtx, originalNss.dbName());
|
||||
|
||||
// If we aren't running an explain for updateOne or deleteOne without shard key,
|
||||
// continue and run the original explain path.
|
||||
if (runExplainWithoutShardKey(
|
||||
opCtx, batchedRequest, originalNss, verbosity, &bodyBuilder)) {
|
||||
return;
|
||||
}
|
||||
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss};
|
||||
router.routeWithRoutingContext(
|
||||
opCtx,
|
||||
"explain write"_sd,
|
||||
[&](OperationContext* opCtx, RoutingContext& originalRoutingCtx) {
|
||||
auto translatedReqBSON = originalRequestBSON;
|
||||
auto translatedNss = originalNss;
|
||||
const auto targeter = CollectionRoutingInfoTargeter(opCtx, originalNss);
|
||||
|
||||
auto& unusedRoutingCtx = translateNssForRawDataAccordingToRoutingInfo(
|
||||
opCtx,
|
||||
originalNss,
|
||||
targeter,
|
||||
originalRoutingCtx,
|
||||
[&](const NamespaceString& bucketsNss) {
|
||||
translatedNss = bucketsNss;
|
||||
switch (batchedRequest.getBatchType()) {
|
||||
case BatchedCommandRequest::BatchType_Insert:
|
||||
translatedReqBSON = rewriteCommandForRawDataOperation<
|
||||
write_ops::InsertCommandRequest>(originalRequestBSON,
|
||||
translatedNss.coll());
|
||||
break;
|
||||
case BatchedCommandRequest::BatchType_Update:
|
||||
translatedReqBSON = rewriteCommandForRawDataOperation<
|
||||
write_ops::UpdateCommandRequest>(originalRequestBSON,
|
||||
translatedNss.coll());
|
||||
break;
|
||||
case BatchedCommandRequest::BatchType_Delete:
|
||||
translatedReqBSON = rewriteCommandForRawDataOperation<
|
||||
write_ops::DeleteCommandRequest>(originalRequestBSON,
|
||||
translatedNss.coll());
|
||||
break;
|
||||
}
|
||||
});
|
||||
unusedRoutingCtx.skipValidation();
|
||||
|
||||
const auto explainCmd =
|
||||
ClusterExplain::wrapAsExplain(translatedReqBSON, verbosity);
|
||||
|
||||
// We will time how long it takes to run the commands on the shards.
|
||||
Timer timer;
|
||||
|
||||
// Target the command to the shards based on the singleton batch item.
|
||||
BatchItemRef targetingBatchItem(requestPtr, 0);
|
||||
std::vector<AsyncRequestsSender::Response> shardResponses;
|
||||
commandOpWrite(opCtx,
|
||||
translatedNss,
|
||||
explainCmd,
|
||||
std::move(targetingBatchItem),
|
||||
targeter,
|
||||
&shardResponses);
|
||||
uassertStatusOK(ClusterExplain::buildExplainResult(
|
||||
makeBlankExpressionContext(opCtx, translatedNss),
|
||||
shardResponses,
|
||||
ClusterExplain::kWriteOnShards,
|
||||
timer.millis(),
|
||||
originalRequestBSON,
|
||||
&bodyBuilder));
|
||||
});
|
||||
break;
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
LOGV2_INFO(10370602,
|
||||
"Failed initialization of routing info because the database has been "
|
||||
"concurrently dropped",
|
||||
logAttrs(originalNss.dbName()),
|
||||
"attemptNumber"_attr = attempts,
|
||||
"maxAttempts"_attr = kMaxDatabaseCreationAttempts);
|
||||
|
||||
if (++attempts >= kMaxDatabaseCreationAttempts) {
|
||||
// The maximum number of attempts has been reached, so the procedure fails as it
|
||||
// could be a logical error. At this point, it is unlikely that the error is caused
|
||||
// by concurrent drop database operations.
|
||||
throw;
|
||||
}
|
||||
}
|
||||
// If we aren't running an explain for updateOne or deleteOne without shard key,
|
||||
// continue and run the original explain path.
|
||||
if (runExplainWithoutShardKey(opCtx, batchedRequest, originalNss, verbosity, &bodyBuilder)) {
|
||||
return;
|
||||
}
|
||||
|
||||
sharding::router::CollectionRouter router{opCtx->getServiceContext(), originalNss};
|
||||
|
||||
// Implicitly create the db if it doesn't exist. There is no way right now to return an
|
||||
// explain on a sharded cluster if the database doesn't exist.
|
||||
// TODO (SERVER-108882) Stop creating the db once explain can be executed when th db
|
||||
// doesn't exist.
|
||||
router.createDbImplicitlyOnRoute();
|
||||
router.routeWithRoutingContext(
|
||||
opCtx,
|
||||
"explain write"_sd,
|
||||
[&](OperationContext* opCtx, RoutingContext& originalRoutingCtx) {
|
||||
auto translatedReqBSON = originalRequestBSON;
|
||||
auto translatedNss = originalNss;
|
||||
const auto targeter = CollectionRoutingInfoTargeter(opCtx, originalNss);
|
||||
|
||||
auto& unusedRoutingCtx = translateNssForRawDataAccordingToRoutingInfo(
|
||||
opCtx,
|
||||
originalNss,
|
||||
targeter,
|
||||
originalRoutingCtx,
|
||||
[&](const NamespaceString& bucketsNss) {
|
||||
translatedNss = bucketsNss;
|
||||
switch (batchedRequest.getBatchType()) {
|
||||
case BatchedCommandRequest::BatchType_Insert:
|
||||
translatedReqBSON =
|
||||
rewriteCommandForRawDataOperation<write_ops::InsertCommandRequest>(
|
||||
originalRequestBSON, translatedNss.coll());
|
||||
break;
|
||||
case BatchedCommandRequest::BatchType_Update:
|
||||
translatedReqBSON =
|
||||
rewriteCommandForRawDataOperation<write_ops::UpdateCommandRequest>(
|
||||
originalRequestBSON, translatedNss.coll());
|
||||
break;
|
||||
case BatchedCommandRequest::BatchType_Delete:
|
||||
translatedReqBSON =
|
||||
rewriteCommandForRawDataOperation<write_ops::DeleteCommandRequest>(
|
||||
originalRequestBSON, translatedNss.coll());
|
||||
break;
|
||||
}
|
||||
});
|
||||
unusedRoutingCtx.skipValidation();
|
||||
|
||||
const auto explainCmd = ClusterExplain::wrapAsExplain(translatedReqBSON, verbosity);
|
||||
|
||||
// We will time how long it takes to run the commands on the shards.
|
||||
Timer timer;
|
||||
|
||||
// Target the command to the shards based on the singleton batch item.
|
||||
BatchItemRef targetingBatchItem(requestPtr, 0);
|
||||
std::vector<AsyncRequestsSender::Response> shardResponses;
|
||||
commandOpWrite(opCtx,
|
||||
translatedNss,
|
||||
explainCmd,
|
||||
std::move(targetingBatchItem),
|
||||
targeter,
|
||||
&shardResponses);
|
||||
uassertStatusOK(
|
||||
ClusterExplain::buildExplainResult(makeBlankExpressionContext(opCtx, translatedNss),
|
||||
shardResponses,
|
||||
ClusterExplain::kWriteOnShards,
|
||||
timer.millis(),
|
||||
originalRequestBSON,
|
||||
&bodyBuilder));
|
||||
});
|
||||
}
|
||||
|
||||
bool ClusterWriteCmd::InvocationBase::runImpl(OperationContext* opCtx,
|
||||
|
|
|
|||
|
|
@ -1037,6 +1037,21 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
|
|||
|
||||
sharding::router::CollectionRouter router(opCtx->getServiceContext(), namespaces.executionNss);
|
||||
|
||||
bool isExplain = verbosity.has_value();
|
||||
if (isExplain) {
|
||||
// Implicitly create the database for explain commands since, right now, there is no way
|
||||
// to respond properly when the database doesn't exist.
|
||||
// Before, the database was implicitly created by the CollectionRoutingInfoTargeter class,
|
||||
// (for context, it's a legacy class to store the routing information), now that we are
|
||||
// using the RoutingContext instead, we still need to create a database until SERVER-108882
|
||||
// gets addressed.
|
||||
// TODO (SERVER-108882) Stop creating the db once explain can be executed when th db
|
||||
// doesn't exist.
|
||||
router.createDbImplicitlyOnRoute();
|
||||
}
|
||||
|
||||
// We'll use routerBodyStarted to distinguish whether an error was thrown before or after the
|
||||
// body function was executed.
|
||||
bool routerBodyStarted = false;
|
||||
auto bodyFn = [&](OperationContext* opCtx, RoutingContext& routingCtx) {
|
||||
routerBodyStarted = true;
|
||||
|
|
@ -1053,33 +1068,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
|
|||
return Status::OK();
|
||||
};
|
||||
|
||||
const size_t kMaxDatabaseCreationAttempts = 3;
|
||||
size_t attempts = 1;
|
||||
Status status{Status::OK()};
|
||||
bool isExplain = verbosity.has_value();
|
||||
while (true) {
|
||||
if (isExplain) {
|
||||
// Implicitly create the database for explain commands since, right now, there is no way
|
||||
// to respond properly when the database doesn't exist. Before the database was
|
||||
// implicitly created by CollectionRoutingInfoTargeter, now that we are not using this
|
||||
// class anymore still need to create a database.
|
||||
// TODO (SERVER-108882) Stop creating the db once explain can be executed when th db
|
||||
// doesn't exist.
|
||||
cluster::createDatabase(opCtx, namespaces.executionNss.dbName());
|
||||
}
|
||||
// Route the command and capture the returned status.
|
||||
Status status = std::invoke([&]() -> Status {
|
||||
try {
|
||||
status = router.routeWithRoutingContext(opCtx, comment, bodyFn);
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
|
||||
if (isExplain && ++attempts < kMaxDatabaseCreationAttempts) {
|
||||
continue;
|
||||
}
|
||||
status = ex.toStatus();
|
||||
return router.routeWithRoutingContext(opCtx, comment, bodyFn);
|
||||
} catch (const DBException& ex) {
|
||||
status = ex.toStatus();
|
||||
return ex.toStatus();
|
||||
}
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
// Error handling for exceptions raised prior to executing the runAggregation operation.
|
||||
if (!status.isOK() && !routerBodyStarted) {
|
||||
uassert(CollectionUUIDMismatchInfo(request.getDbName(),
|
||||
*request.getCollectionUUID(),
|
||||
|
|
|
|||
Loading…
Reference in New Issue