SERVER-74107 Use transaction in cluster parameter refresher to get up-to-date FCV

This commit is contained in:
Gabriel Marks 2023-04-05 14:31:04 +00:00 committed by Evergreen Agent
parent 858c7ae157
commit 17804518da
20 changed files with 276 additions and 121 deletions

View File

@ -1,5 +1,5 @@
// Auth tests for the $listLocalSessions {allUsers:true} aggregation stage. // Auth tests for the $listLocalSessions {allUsers:true} aggregation stage.
// @tags: [requires_sharding] // @tags: [requires_fcv_70, requires_sharding]
(function() { (function() {
'use strict'; 'use strict';
@ -49,8 +49,16 @@ const mongod = MongoRunner.runMongod({auth: ""});
runListAllLocalSessionsTest(mongod); runListAllLocalSessionsTest(mongod);
MongoRunner.stopMongod(mongod); MongoRunner.stopMongod(mongod);
const st = const st = new ShardingTest({
new ShardingTest({shards: 1, mongos: 1, config: 1, other: {keyFile: 'jstests/libs/key1'}}); shards: 1,
mongos: 1,
config: 1,
other: {
keyFile: 'jstests/libs/key1',
mongosOptions:
{setParameter: {'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}}
}
});
runListAllLocalSessionsTest(st.s0); runListAllLocalSessionsTest(st.s0);
st.stop(); st.stop();
})(); })();

View File

@ -128,8 +128,11 @@ function runTest(st, startupRefreshIntervalMS) {
[st.configRS, ...st._rs.map(rs => rs.test)].forEach(rs => { [st.configRS, ...st._rs.map(rs => rs.test)].forEach(rs => {
assert(rs.getPrimary().getDB('config').clusterParameters.drop()); assert(rs.getPrimary().getDB('config').clusterParameters.drop());
}); });
expectedParams = {};
// Perform a dummy write in order to get the config shard's cluster time cached on the mongos.
st.s.getDB('config').abc.insert({a: "hello"});
expectedParams = {};
assertParams(startupRefreshIntervalRelaxedMS); assertParams(startupRefreshIntervalRelaxedMS);
} }

View File

@ -1,6 +1,7 @@
// Tests multi-statement transactions metrics in the serverStatus output from mongos in various // Tests multi-statement transactions metrics in the serverStatus output from mongos in various
// basic cases. // basic cases.
// @tags: [ // @tags: [
// requires_fcv_70,
// uses_multi_shard_transaction, // uses_multi_shard_transaction,
// uses_transactions, // uses_transactions,
// ] // ]
@ -201,7 +202,15 @@ const dbName = "test";
const collName = "foo"; const collName = "foo";
const ns = dbName + '.' + collName; const ns = dbName + '.' + collName;
const st = new ShardingTest({shards: 2, mongos: 2, config: 1}); const st = new ShardingTest({
shards: 2,
mongos: 2,
config: 1,
other: {
mongosOptions:
{setParameter: {'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}}
}
});
const session = st.s.startSession(); const session = st.s.startSession();
const sessionDB = session.getDatabase(dbName); const sessionDB = session.getDatabase(dbName);

View File

@ -111,23 +111,18 @@ function runDowngradeUpgradeTestForCWSP(conn, isMongod, isStandalone, verifyStat
verifyStateCallback(sp, true); verifyStateCallback(sp, true);
} }
// Downgrade FCV and ensure we can't set, and get either fails (if FCV is known by the // Downgrade FCV and ensure we can't set or get.
// server) or gets the default value (if it is not).
// If our downgrade takes us below the minimum FCV for // If our downgrade takes us below the minimum FCV for
// featureFlagAuditConfigClusterParameter, we expect all cluster parameter commands to fail // featureFlagAuditConfigClusterParameter, we expect all cluster parameter commands to fail
// for standalone. // for standalone.
assert.commandWorked(admin.runCommand({setFeatureCompatibilityVersion: lastLTSFCV})); assert.commandWorked(admin.runCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
if (isMongod) {
assertGetFailed(admin.runCommand({getClusterParameter: sp})); assertGetFailed(admin.runCommand({getClusterParameter: sp}));
} else {
const afterDowngrade =
assert.commandWorked(admin.runCommand({getClusterParameter: sp}));
assert.eq(val(afterDowngrade), initval);
}
assertSetFailed(admin.runCommand({setClusterParameter: {[sp]: {intData: updateVal + 1}}})); assertSetFailed(admin.runCommand({setClusterParameter: {[sp]: {intData: updateVal + 1}}}));
if (!(isStandalone && !FeatureFlagUtil.isEnabled(admin, 'AuditConfigClusterParameter'))) { if (!(isStandalone && !FeatureFlagUtil.isEnabled(admin, 'AuditConfigClusterParameter'))) {
assertParamExistenceInGetParamStar( assertParamExistenceInGetParamStar(
assert.commandWorked(admin.runCommand({getClusterParameter: "*"})), sp, !isMongod); assert.commandWorked(admin.runCommand({getClusterParameter: "*"})), sp, false);
} }
if (verifyStateCallback !== undefined) { if (verifyStateCallback !== undefined) {
verifyStateCallback(sp, false); verifyStateCallback(sp, false);

View File

@ -1,6 +1,6 @@
/** /**
* Test that mongos is collecting telemetry metrics. * Test that mongos is collecting telemetry metrics.
* @tags: [featureFlagTelemetry] * @tags: [requires_fcv_70, featureFlagTelemetry]
*/ */
(function() { (function() {
"use strict"; "use strict";
@ -18,6 +18,7 @@ const setup = () => {
mongosOptions: { mongosOptions: {
setParameter: { setParameter: {
internalQueryConfigureTelemetrySamplingRate: 2147483647, internalQueryConfigureTelemetrySamplingRate: 2147483647,
'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"
} }
}, },
}); });

View File

@ -1,4 +1,5 @@
// @tags: [ // @tags: [
// requires_fcv_70,
// requires_replication, // requires_replication,
// requires_sharding, // requires_sharding,
// ] // ]
@ -39,6 +40,8 @@ function Sharding(lifetime) {
rs: true, rs: true,
rsOptions: {setParameter: {TransactionRecordMinimumLifetimeMinutes: lifetime}}, rsOptions: {setParameter: {TransactionRecordMinimumLifetimeMinutes: lifetime}},
rs0: {nodes: 1}, rs0: {nodes: 1},
mongosOptions:
{setParameter: {'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}}
}, },
}); });

View File

@ -3,7 +3,7 @@
* config.image_collection entries for a transaction session if the logical session that it * config.image_collection entries for a transaction session if the logical session that it
* corresponds to has expired and been removed from the config.system.sessions collection. * corresponds to has expired and been removed from the config.system.sessions collection.
* *
* @tags: [requires_fcv_60, uses_transactions] * @tags: [requires_fcv_70, uses_transactions]
*/ */
(function() { (function() {

View File

@ -1,7 +1,7 @@
/* /*
* Tests running killSessions to kill internal sessions on both mongos and mongod. * Tests running killSessions to kill internal sessions on both mongos and mongod.
* *
* @tags: [requires_fcv_60, uses_transactions, temporary_catalog_shard_incompatible] * @tags: [requires_fcv_70, uses_transactions, temporary_catalog_shard_incompatible]
*/ */
(function() { (function() {
'use strict'; 'use strict';
@ -10,7 +10,10 @@ TestData.disableImplicitSessions = true;
const st = new ShardingTest({ const st = new ShardingTest({
shards: 1, shards: 1,
mongosOptions: {setParameter: {maxSessions: 1}}, mongosOptions: {
setParameter:
{maxSessions: 1, 'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}
},
shardOptions: {setParameter: {maxSessions: 1}} shardOptions: {setParameter: {maxSessions: 1}}
}); });
const shard0Primary = st.rs0.getPrimary(); const shard0Primary = st.rs0.getPrimary();

View File

@ -1,7 +1,7 @@
/* /*
* Tests basic support for internal sessions. * Tests basic support for internal sessions.
* *
* @tags: [requires_fcv_60, uses_transactions, temporary_catalog_shard_incompatible] * @tags: [requires_fcv_70, uses_transactions, temporary_catalog_shard_incompatible]
*/ */
(function() { (function() {
'use strict'; 'use strict';
@ -10,7 +10,10 @@ TestData.disableImplicitSessions = true;
const st = new ShardingTest({ const st = new ShardingTest({
shards: 1, shards: 1,
mongosOptions: {setParameter: {maxSessions: 1}}, mongosOptions: {
setParameter:
{maxSessions: 1, 'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}
},
shardOptions: {setParameter: {maxSessions: 1}} shardOptions: {setParameter: {maxSessions: 1}}
}); });
const shard0Primary = st.rs0.getPrimary(); const shard0Primary = st.rs0.getPrimary();

View File

@ -1,8 +1,16 @@
// @tags: [requires_fcv_70]
(function() { (function() {
'use strict'; 'use strict';
// create // create
var s = new ShardingTest({shards: 2}); var s = new ShardingTest({
shards: 2,
other: {
mongosOptions:
{setParameter: {'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}}
}
});
var db = s.getDB("test"); var db = s.getDB("test");
var ss = db.serverStatus(); var ss = db.serverStatus();
@ -34,7 +42,10 @@ assert.eq(1, ss.shardingStatistics.catalogCache.countFullRefreshesStarted);
// does not pre cache when set parameter is disabled // does not pre cache when set parameter is disabled
s.restartMongos(0, { s.restartMongos(0, {
restart: true, restart: true,
setParameter: {loadRoutingTableOnStartup: false}, setParameter: {
loadRoutingTableOnStartup: false,
'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"
},
}); });
db = s.getDB("test"); db = s.getDB("test");

View File

@ -42,7 +42,11 @@ const stParams = {
name: jsTestName(), name: jsTestName(),
keyFile: key, keyFile: key,
shards: 3, shards: 3,
rs: {nodes: 1, setParameter: {internalQueryExecYieldIterations: 1}} rs: {nodes: 1, setParameter: {internalQueryExecYieldIterations: 1}},
other: {
mongosOptions:
{setParameter: {'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}}
}
}; };
// Create a new sharded cluster for testing. We set the internalQueryExecYieldIterations // Create a new sharded cluster for testing. We set the internalQueryExecYieldIterations

View File

@ -1,6 +1,7 @@
/** /**
* Requires no shards. * Requires no shards.
* @tags: [ * @tags: [
* requires_fcv_70,
* catalog_shard_incompatible, * catalog_shard_incompatible,
* requires_fcv_70, * requires_fcv_70,
* ] * ]
@ -17,7 +18,13 @@ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Col
// implicit sessions. // implicit sessions.
TestData.disableImplicitSessions = true; TestData.disableImplicitSessions = true;
var st = new ShardingTest({shards: 0}); var st = new ShardingTest({
shards: 0,
other: {
mongosOptions:
{setParameter: {'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}}
}
});
var configSvr = st.configRS.getPrimary(); var configSvr = st.configRS.getPrimary();
var mongos = st.s; var mongos = st.s;

View File

@ -6,7 +6,8 @@
* no failures, a participant having failed over, a participant being unable to satisfy the client's * no failures, a participant having failed over, a participant being unable to satisfy the client's
* writeConcern, and an invalid client writeConcern. * writeConcern, and an invalid client writeConcern.
* *
* @tags: [uses_transactions, uses_multi_shard_transaction, temporary_catalog_shard_incompatible] * @tags: [requires_fcv_70, uses_transactions, uses_multi_shard_transaction,
* temporary_catalog_shard_incompatible]
*/ */
(function() { (function() {
@ -65,7 +66,10 @@ let st = new ShardingTest({
// Create shards with more than one node because we test for writeConcern majority failing. // Create shards with more than one node because we test for writeConcern majority failing.
config: 1, config: 1,
other: { other: {
mongosOptions: {verbose: 3}, mongosOptions: {
verbose: 3,
setParameter: {'failpoint.skipClusterParameterRefresh': "{'mode':'alwaysOn'}"}
},
rs0: {nodes: [{}, {rsConfig: {priority: 0}}]}, rs0: {nodes: [{}, {rsConfig: {priority: 0}}]},
rs1: {nodes: [{}, {rsConfig: {priority: 0}}]}, rs1: {nodes: [{}, {rsConfig: {priority: 0}}]},
rs2: {nodes: [{}, {rsConfig: {priority: 0}}]}, rs2: {nodes: [{}, {rsConfig: {priority: 0}}]},

View File

@ -85,6 +85,22 @@ bool ServerParameter::isEnabled() const {
} }
bool ServerParameter::isEnabledOnVersion( bool ServerParameter::isEnabledOnVersion(
const multiversion::FeatureCompatibilityVersion& targetFCV) const {
if (_disableState != DisableState::Enabled) {
return false;
}
return _isEnabledOnVersion(targetFCV);
}
bool ServerParameter::canBeEnabledOnVersion(
const multiversion::FeatureCompatibilityVersion& targetFCV) const {
if (_disableState == DisableState::PermanentlyDisabled) {
return false;
}
return _isEnabledOnVersion(targetFCV);
}
bool ServerParameter::_isEnabledOnVersion(
const multiversion::FeatureCompatibilityVersion& targetFCV) const { const multiversion::FeatureCompatibilityVersion& targetFCV) const {
return minFCVIsLessThanOrEqualToVersion(targetFCV) && return minFCVIsLessThanOrEqualToVersion(targetFCV) &&
!featureFlagIsDisabledOnVersion(targetFCV); !featureFlagIsDisabledOnVersion(targetFCV);
@ -198,51 +214,11 @@ Status IDLServerParameterDeprecatedAlias::setFromString(StringData str,
return _sp->setFromString(str, tenantId); return _sp->setFromString(str, tenantId);
} }
namespace {
class DisabledTestParameter : public ServerParameter {
public:
explicit DisabledTestParameter(ServerParameter* sp)
: ServerParameter(sp->name(), sp->getServerParameterType()), _sp(sp) {}
void append(OperationContext* opCtx,
BSONObjBuilder* b,
StringData name,
const boost::optional<TenantId>&) final {}
Status validate(const BSONElement& newValueElement,
const boost::optional<TenantId>& tenantId) const final {
return {ErrorCodes::BadValue,
str::stream() << "Server parameter: '" << name() << "' is currently disabled"};
}
Status setFromString(StringData, const boost::optional<TenantId>&) final {
return {ErrorCodes::BadValue,
str::stream() << "Server parameter: '" << name() << "' is currently disabled"};
}
Status set(const BSONElement& newValueElement, const boost::optional<TenantId>&) final {
return setFromString("", boost::none);
}
Status reset(const boost::optional<TenantId>&) final {
return setFromString("", boost::none);
}
bool isEnabledOnVersion(const multiversion::FeatureCompatibilityVersion&) const override {
return false;
}
private:
// Retain the original pointer to avoid ASAN complaining.
ServerParameter* _sp;
};
} // namespace
void ServerParameterSet::disableTestParameters() { void ServerParameterSet::disableTestParameters() {
for (auto& spit : _map) { for (auto& spit : _map) {
auto*& sp = spit.second; auto*& sp = spit.second;
if (sp->isTestOnly()) { if (sp->isTestOnly()) {
sp = new DisabledTestParameter(sp); sp->disable(true /* permanent */);
} }
} }
} }

View File

@ -218,11 +218,31 @@ public:
_redact = true; _redact = true;
} }
private:
enum DisableState { Enabled = 0, TemporarilyDisabled = 1, PermanentlyDisabled = 2 };
public:
void disable(bool permanent) {
if (_disableState != DisableState::PermanentlyDisabled) {
_disableState =
permanent ? DisableState::PermanentlyDisabled : DisableState::TemporarilyDisabled;
}
}
void enable() {
if (_disableState == DisableState::TemporarilyDisabled) {
_disableState = DisableState::Enabled;
}
}
bool isEnabled() const; bool isEnabled() const;
// Return whether this server parameter is compatible with the given FCV. // Return whether this server parameter would be enabled with the given FCV
virtual bool isEnabledOnVersion( bool isEnabledOnVersion(const multiversion::FeatureCompatibilityVersion& targetFCV) const;
const multiversion::FeatureCompatibilityVersion& targetFCV) const;
// Return whether this server parameter is compatible with the given FCV, regardless of if it is
// temporarily disabled
bool canBeEnabledOnVersion(const multiversion::FeatureCompatibilityVersion& targetFCV) const;
void setFeatureFlag(FeatureFlag* featureFlag) { void setFeatureFlag(FeatureFlag* featureFlag) {
_featureFlag = featureFlag; _featureFlag = featureFlag;
@ -233,6 +253,9 @@ public:
} }
protected: protected:
virtual bool _isEnabledOnVersion(
const multiversion::FeatureCompatibilityVersion& targetFCV) const;
bool featureFlagIsDisabledOnVersion( bool featureFlagIsDisabledOnVersion(
const multiversion::FeatureCompatibilityVersion& targetFCV) const; const multiversion::FeatureCompatibilityVersion& targetFCV) const;
@ -251,6 +274,11 @@ private:
ServerParameterType _type; ServerParameterType _type;
bool _testOnly = false; bool _testOnly = false;
bool _redact = false; bool _redact = false;
// Tracks whether a parameter is enabled, temporarily disabled, or permanently disabled. This is
// used when disabling (permanently) test-only parameters, and when enabling/disabling
// (temporarily) cluster parameters on the mongos based on the cluster's FCV.
DisableState _disableState = DisableState::Enabled;
}; };
class ServerParameterSet { class ServerParameterSet {

View File

@ -1510,7 +1510,8 @@ void ExecCommandDatabase::_initiateCommand() {
const auto allowTransactionsOnConfigDatabase = const auto allowTransactionsOnConfigDatabase =
(serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer) || (serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer) ||
serverGlobalParams.clusterRole.has(ClusterRole::ShardServer)); serverGlobalParams.clusterRole.has(ClusterRole::ShardServer)) ||
client->isFromSystemConnection();
const auto invocationNss = _invocation->ns(); const auto invocationNss = _invocation->ns();

View File

@ -88,6 +88,8 @@ env.Library(
LIBDEPS_PRIVATE=[ LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/audit', '$BUILD_DIR/mongo/db/audit',
'$BUILD_DIR/mongo/db/server_base', '$BUILD_DIR/mongo/db/server_base',
'$BUILD_DIR/mongo/db/transaction/transaction_api',
'$BUILD_DIR/mongo/executor/inline_executor',
'$BUILD_DIR/mongo/s/grid', '$BUILD_DIR/mongo/s/grid',
'cluster_server_parameter', 'cluster_server_parameter',
'cluster_server_parameter_common', 'cluster_server_parameter_common',

View File

@ -33,14 +33,21 @@
#include "mongo/db/audit.h" #include "mongo/db/audit.h"
#include "mongo/db/commands/list_databases_for_all_tenants_gen.h" #include "mongo/db/commands/list_databases_for_all_tenants_gen.h"
#include "mongo/db/feature_compatibility_version_parser.h"
#include "mongo/db/multitenancy_gen.h" #include "mongo/db/multitenancy_gen.h"
#include "mongo/db/transaction/transaction_api.h"
#include "mongo/db/vector_clock.h"
#include "mongo/idl/cluster_server_parameter_common.h" #include "mongo/idl/cluster_server_parameter_common.h"
#include "mongo/idl/cluster_server_parameter_refresher_gen.h" #include "mongo/idl/cluster_server_parameter_refresher_gen.h"
#include "mongo/logv2/log.h" #include "mongo/logv2/log.h"
#include "mongo/s/grid.h" #include "mongo/s/grid.h"
#include "mongo/s/is_mongos.h"
#include "mongo/util/stacktrace.h"
#include "mongo/util/version/releases.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kControl #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kControl
MONGO_FAIL_POINT_DEFINE(skipClusterParameterRefresh);
namespace mongo { namespace mongo {
namespace { namespace {
@ -51,44 +58,97 @@ Seconds loadInterval() {
return Seconds(clusterServerParameterRefreshIntervalSecs.load()); return Seconds(clusterServerParameterRefreshIntervalSecs.load());
} }
StatusWith<TenantIdMap<stdx::unordered_map<std::string, BSONObj>>> std::pair<multiversion::FeatureCompatibilityVersion,
getClusterParametersFromConfigServer(OperationContext* opCtx) { TenantIdMap<stdx::unordered_map<std::string, BSONObj>>>
// Attempt to retrieve cluster parameter documents from the config server. getFCVAndClusterParametersFromConfigServer() {
// exhaustiveFindOnConfig makes up to 3 total attempts if it receives a retriable error before // Use an alternative client region, because we call refreshParameters both from the internal
// giving up. // refresher process and from getClusterParameter.
LOGV2_DEBUG(6226404, 3, "Retrieving cluster server parameters from config server"); auto altClient = getGlobalServiceContext()->makeClient("clusterParameterRefreshTransaction");
auto configServers = Grid::get(opCtx)->shardRegistry()->getConfigShard(); AlternativeClientRegion clientRegion(altClient);
auto swTenantIds = getTenantsWithConfigDbsOnShard(opCtx, configServers.get()); auto opCtx = cc().makeOperationContext();
if (!swTenantIds.isOK()) { auto as = AuthorizationSession::get(cc());
return swTenantIds.getStatus(); as->grantInternalAuthorization(opCtx.get());
}
auto tenantIds = std::move(swTenantIds.getValue());
TenantIdMap<stdx::unordered_map<std::string, BSONObj>> allDocs; auto configServers = Grid::get(opCtx.get())->shardRegistry()->getConfigShard();
for (const auto& tenantId : tenantIds) { // Note that we get the list of tenants outside of the transaction. This should be okay, as if
auto swFindResponse = configServers->exhaustiveFindOnConfig( // we miss out on some new tenants created between this call and the transaction, we are just
opCtx, // getting slightly old data. Importantly, different tenants' cluster parameters don't interact
ReadPreferenceSetting{ReadPreference::PrimaryOnly}, // with each other, so we don't need a consistent snapshot of cluster parameters across all
repl::ReadConcernLevel::kMajorityReadConcern, // tenants, just a consistent snapshot per tenant.
NamespaceString::makeClusterParametersNSS(tenantId), auto tenantIds =
BSONObj(), uassertStatusOK(getTenantsWithConfigDbsOnShard(opCtx.get(), configServers.get()));
BSONObj(),
boost::none);
// If the error is not retriable or persists beyond the max number of retry attempts, give auto allDocs = std::make_shared<TenantIdMap<stdx::unordered_map<std::string, BSONObj>>>();
// up and throw an error. auto fcv = std::make_shared<multiversion::FeatureCompatibilityVersion>();
if (!swFindResponse.isOK()) { auto doFetch = [allDocs, fcv, &tenantIds](const txn_api::TransactionClient& txnClient,
return swFindResponse.getStatus(); ExecutorPtr txnExec) {
} FindCommandRequest findFCV{NamespaceString("admin.system.version")};
stdx::unordered_map<std::string, BSONObj> docsMap; findFCV.setFilter(BSON("_id"
for (const auto& doc : swFindResponse.getValue().docs) { << "featureCompatibilityVersion"));
auto name = doc["_id"].String(); return txnClient.exhaustiveFind(findFCV)
docsMap.insert({std::move(name), doc}); .thenRunOn(txnExec)
} .then([fcv, allDocs, &tenantIds, &txnClient, txnExec](
allDocs.insert({std::move(tenantId), std::move(docsMap)}); const std::vector<BSONObj>& foundDocs) {
} uassert(7410710,
"Expected to find FCV in admin.system.version but found nothing!",
!foundDocs.empty());
*fcv = FeatureCompatibilityVersionParser::parseVersion(
foundDocs[0]["version"].String());
return allDocs; // Fetch one tenant, then call doFetchTenants for the rest of the tenants within
// then() recursively.
auto doFetchTenants = [](auto it,
const auto& tenantIds,
auto allDocs,
const auto& txnClient,
ExecutorPtr txnExec,
auto& doFetchTenants_ref) mutable {
if (it == tenantIds.end()) {
return SemiFuture<void>::makeReady();
}
FindCommandRequest findClusterParametersTenant{
NamespaceString::makeClusterParametersNSS(*it)};
// We don't specify a filter as we want all documents.
return txnClient.exhaustiveFind(findClusterParametersTenant)
.thenRunOn(txnExec)
.then([&doFetchTenants_ref, &txnClient, &tenantIds, txnExec, it, allDocs](
const std::vector<BSONObj>& foundDocs) {
stdx::unordered_map<std::string, BSONObj> docsMap;
for (const auto& doc : foundDocs) {
auto name = doc["_id"].String();
docsMap.insert({std::move(name), doc.getOwned()});
}
allDocs->insert({*it, std::move(docsMap)});
return doFetchTenants_ref(std::next(it),
tenantIds,
allDocs,
txnClient,
txnExec,
doFetchTenants_ref);
})
.semi();
};
return doFetchTenants(
tenantIds.begin(), tenantIds, allDocs, txnClient, txnExec, doFetchTenants);
})
.semi();
};
repl::ReadConcernArgs::get(opCtx.get()) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern);
// We need to commit w/ writeConcern = majority for readConcern = snapshot to work.
opCtx->setWriteConcern(WriteConcernOptions{WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
WriteConcernOptions::kNoTimeout});
auto executor = Grid::get(opCtx.get())->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
txn_api::SyncTransactionWithRetries txn(
opCtx.get(), sleepInlineExecutor, nullptr, inlineExecutor);
txn.run(opCtx.get(), doFetch);
return {*fcv, *allDocs};
} }
} // namespace } // namespace
@ -121,30 +181,56 @@ void ClusterServerParameterRefresher::setPeriod(Milliseconds period) {
} }
Status ClusterServerParameterRefresher::refreshParameters(OperationContext* opCtx) { Status ClusterServerParameterRefresher::refreshParameters(OperationContext* opCtx) {
// Query the config servers for all cluster parameter documents. invariant(isMongos());
auto swClusterParameterDocs = getClusterParametersFromConfigServer(opCtx); multiversion::FeatureCompatibilityVersion fcv;
if (!swClusterParameterDocs.isOK()) { TenantIdMap<stdx::unordered_map<std::string, BSONObj>> clusterParameterDocs;
LOGV2_WARNING(6226401,
"Could not refresh cluster server parameters from config servers. Will retry " try {
"after refresh interval elapses", std::tie(fcv, clusterParameterDocs) = getFCVAndClusterParametersFromConfigServer();
"clusterServerParameterRefreshIntervalSecs"_attr = loadInterval(), } catch (const DBException& ex) {
"reason"_attr = swClusterParameterDocs.getStatus().reason()); LOGV2_WARNING(
return swClusterParameterDocs.getStatus(); 7410719,
"Could not refresh cluster server parameters from config servers due to failure in "
"getFCVAndClusterParametersFromConfigServer. Will retry after refresh interval",
"ex"_attr = ex.toStatus());
return ex.toStatus();
} }
// Set each in-memory cluster parameter that was returned in the response. // Set each in-memory cluster parameter that was returned in the response.
bool isSuccessful = true; bool isSuccessful = true;
Status status = Status::OK(); Status status = Status::OK();
ServerParameterSet* clusterParameterCache = ServerParameterSet::getClusterParameterSet(); ServerParameterSet* clusterParameterCache = ServerParameterSet::getClusterParameterSet();
bool fcvChanged = fcv != _lastFcv;
auto clusterParameterDocs = std::move(swClusterParameterDocs.getValue()); if (fcvChanged) {
LOGV2_DEBUG(7410705,
3,
"Cluster's FCV was different from last during refresh",
"oldFCV"_attr = multiversion::toString(_lastFcv),
"newFCV"_attr = multiversion::toString(fcv));
}
std::vector<BSONObj> allUpdatedParameters; std::vector<BSONObj> allUpdatedParameters;
allUpdatedParameters.reserve(clusterParameterDocs.size()); allUpdatedParameters.reserve(clusterParameterDocs.size());
for (const auto& [tenantId, tenantParamDocs] : clusterParameterDocs) { for (const auto& [tenantId, tenantParamDocs] : clusterParameterDocs) {
std::vector<BSONObj> updatedParameters; std::vector<BSONObj> updatedParameters;
updatedParameters.reserve(tenantParamDocs.size()); updatedParameters.reserve(tenantParamDocs.size());
for (const auto& [name, sp] : clusterParameterCache->getMap()) { for (auto [name, sp] : clusterParameterCache->getMap()) {
if (fcvChanged) {
// Use canBeEnabled because if we previously temporarily disabled the parameter,
// isEnabled will be false
if (sp->canBeEnabledOnVersion(_lastFcv) && !sp->canBeEnabledOnVersion(fcv)) {
// Parameter is newly disabled on cluster
LOGV2_DEBUG(
7410703, 3, "Disabling parameter during refresh", "name"_attr = name);
sp->disable(false /* permanent */);
continue;
} else if (sp->canBeEnabledOnVersion(fcv) && !sp->canBeEnabledOnVersion(_lastFcv)) {
// Parameter is newly enabled on cluster
LOGV2_DEBUG(
7410704, 3, "Enabling parameter during refresh", "name"_attr = name);
sp->enable();
}
}
if (!sp->isEnabled()) { if (!sp->isEnabled()) {
continue; continue;
} }
@ -196,12 +282,18 @@ Status ClusterServerParameterRefresher::refreshParameters(OperationContext* opCt
"clusterParameterDocuments"_attr = allUpdatedParameters); "clusterParameterDocuments"_attr = allUpdatedParameters);
} }
_lastFcv = fcv;
return status; return status;
} }
void ClusterServerParameterRefresher::start(ServiceContext* serviceCtx, OperationContext* opCtx) { void ClusterServerParameterRefresher::start(ServiceContext* serviceCtx, OperationContext* opCtx) {
auto refresher = std::make_unique<ClusterServerParameterRefresher>(); auto refresher = std::make_unique<ClusterServerParameterRefresher>();
// On mongos, this should always be true after FCV initialization
// (Generic FCV reference):
invariant(serverGlobalParams.featureCompatibility.getVersion() ==
multiversion::GenericFCV::kLatest);
refresher->_lastFcv = serverGlobalParams.featureCompatibility.getVersion();
auto periodicRunner = serviceCtx->getPeriodicRunner(); auto periodicRunner = serviceCtx->getPeriodicRunner();
invariant(periodicRunner); invariant(periodicRunner);
@ -218,6 +310,10 @@ void ClusterServerParameterRefresher::start(ServiceContext* serviceCtx, Operatio
} }
void ClusterServerParameterRefresher::run() { void ClusterServerParameterRefresher::run() {
if (MONGO_unlikely(skipClusterParameterRefresh.shouldFail())) {
return;
}
auto opCtx = cc().makeOperationContext(); auto opCtx = cc().makeOperationContext();
auto status = refreshParameters(opCtx.get()); auto status = refreshParameters(opCtx.get());
if (!status.isOK()) { if (!status.isOK()) {

View File

@ -67,6 +67,7 @@ private:
void run(); void run();
std::unique_ptr<PeriodicJobAnchor> _job; std::unique_ptr<PeriodicJobAnchor> _job;
multiversion::FeatureCompatibilityVersion _lastFcv;
}; };
Status clusterServerParameterRefreshIntervalSecsNotify(const int& newValue); Status clusterServerParameterRefreshIntervalSecsNotify(const int& newValue);

View File

@ -586,7 +586,7 @@ void ParseAndRunCommand::_parseCommand() {
command->attachLogicalSessionsToOpCtx(), command->attachLogicalSessionsToOpCtx(),
true)); true));
auto allowTransactionsOnConfigDatabase = !isMongos(); auto allowTransactionsOnConfigDatabase = !isMongos() || client->isFromSystemConnection();
validateSessionOptions(*_osi, command->getName(), nss, allowTransactionsOnConfigDatabase); validateSessionOptions(*_osi, command->getName(), nss, allowTransactionsOnConfigDatabase);
_wc.emplace(uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body))); _wc.emplace(uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body)));