SERVER-107952 Make _flushReshardingStateChange retry on transient refresh errors until there is a failover (#39126)

GitOrigin-RevId: 2889db4c324870247abfe230e7803792b78921f5
This commit is contained in:
Cheahuychou Mao 2025-07-28 14:55:10 -04:00 committed by MongoDB Bot
parent b2da3d007f
commit 35d8d0b854
5 changed files with 322 additions and 15 deletions

View File

@ -464,6 +464,8 @@ last-continuous:
ticket: SERVER-106614 ticket: SERVER-106614
- test_file: jstests/sharding/balancer_should_return_random_migrations_failpoint.js - test_file: jstests/sharding/balancer_should_return_random_migrations_failpoint.js
ticket: SERVER-105874 ticket: SERVER-105874
- test_file: jstests/sharding/flush_resharding_state_change_errors.js
ticket: SERVER-107952
suites: null suites: null
last-lts: last-lts:
all: all:
@ -985,4 +987,6 @@ last-lts:
ticket: SERVER-105874 ticket: SERVER-105874
- test_file: jstests/with_mongot/e2e/hybridSearch/ranked_fusion_with_filter.js - test_file: jstests/with_mongot/e2e/hybridSearch/ranked_fusion_with_filter.js
ticket: SERVER-107352 ticket: SERVER-107352
- test_file: jstests/sharding/flush_resharding_state_change_errors.js
ticket: SERVER-107952
suites: null suites: null

View File

@ -0,0 +1,212 @@
/**
* Tests that _flushReshardingStateChange command retries sharding metadata refresh on transient
* errors until there is a failover.
*/
import {configureFailPoint} from 'jstests/libs/fail_point_util.js';
import {Thread} from 'jstests/libs/parallelTester.js';
import {ShardingTest} from 'jstests/libs/shardingtest.js';
function runMoveCollection(host, ns, toShard) {
const mongos = new Mongo(host);
return mongos.adminCommand({moveCollection: ns, toShard});
}
function getFlushReshardingStateChangeMetrics(conn) {
const shardingStatistics =
assert.commandWorked(conn.adminCommand({serverStatus: 1})).shardingStatistics;
return {
countFlushReshardingStateChangeTotalShardingMetadataRefreshes:
shardingStatistics.countFlushReshardingStateChangeTotalShardingMetadataRefreshes,
countFlushReshardingStateChangeSuccessfulShardingMetadataRefreshes:
shardingStatistics.countFlushReshardingStateChangeSuccessfulShardingMetadataRefreshes,
countFlushReshardingStateChangeFailedShardingMetadataRefreshes:
shardingStatistics.countFlushReshardingStateChangeFailedShardingMetadataRefreshes
};
}
function validateFlushReshardingStateChangeMetrics(metrics) {
assert.gte(metrics.countFlushReshardingStateChangeTotalShardingMetadataRefreshes, 0, metrics);
assert.gte(
metrics.countFlushReshardingStateChangeSuccessfulShardingMetadataRefreshes, 0, metrics);
assert.gte(metrics.countFlushReshardingStateChangeFailedShardingMetadataRefreshes, 0, metrics);
assert.gte(metrics.countFlushReshardingStateChangeTotalShardingMetadataRefreshes,
metrics.countFlushReshardingStateChangeSuccessfulShardingMetadataRefreshes +
metrics.countFlushReshardingStateChangeFailedShardingMetadataRefreshes,
metrics);
}
function assertSoonFlushReshardingStateChangeStartRetryingOnRefreshErrors(conn) {
let numTries = 0;
assert.soon(() => {
numTries++;
const metrics = getFlushReshardingStateChangeMetrics(conn);
validateFlushReshardingStateChangeMetrics(metrics);
if (numTries % 100 == 0) {
jsTest.log("Waiting for _flushReshardingStateChange to hit refresh errors: " +
tojson(metrics));
}
return metrics.countFlushReshardingStateChangeTotalShardingMetadataRefreshes > 1 &&
metrics.countFlushReshardingStateChangeFailedShardingMetadataRefreshes > 0;
});
}
function assertSoonFlushReshardingStateChangeStopRetryingOnRefreshErrors(conn) {
let numTries = 0;
let prevMetrics;
// Use a large interval to decrease the chance of checking metrics before the next refresh
// retry.
const timeout = null; // Use the default timeout.
const interval = 1000;
assert.soon(() => {
numTries++;
const currMetrics = getFlushReshardingStateChangeMetrics(conn);
validateFlushReshardingStateChangeMetrics(currMetrics);
if (numTries % 10 == 0) {
jsTest.log("Waiting for _flushReshardingStateChange to stop refreshing: " +
tojson({conn, currMetrics, prevMetrics}));
}
if (bsonWoCompare(prevMetrics, currMetrics) == 0) {
jsTest.log("Finished waiting for _flushReshardingStateChange to stop refreshing: " +
tojson({conn, currMetrics, prevMetrics}));
return true;
}
prevMetrics = currMetrics;
return false;
}, "Timed out waiting for _flushReshardingStateChange to stop refreshing", timeout, interval);
}
function assertFlushReshardingStateChangeMetricsNoRefreshErrors(conn) {
const metrics = getFlushReshardingStateChangeMetrics(conn);
jsTest.log("Checking _flushReshardingStateChange metrics: " + tojson(metrics));
validateFlushReshardingStateChangeMetrics(metrics);
assert.eq(metrics.countFlushReshardingStateChangeFailedShardingMetadataRefreshes, 0, metrics);
}
function stepUpNewPrimary(rst) {
const oldPrimary = rst.getPrimary();
const oldSecondary = rst.getSecondary();
assert.neq(oldPrimary, oldSecondary);
rst.stepUp(rst.getSecondary(), {awaitReplicationBeforeStepUp: false});
const newPrimary = rst.getPrimary();
assert.eq(newPrimary, oldSecondary);
}
function testRetryOnTransientError(st, {enableCloneNoRefresh}) {
jsTest.log("Start testing that _flushReshardingStateChange retries sharding metadata refresh " +
"on transient error " + tojsononeline({enableCloneNoRefresh}));
// Set up the collection to reshard.
const dbName = "testDbBasic";
const collName = "testColl";
const ns = dbName + '.' + collName;
const testColl = st.s.getCollection(ns);
assert.commandWorked(
st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
assert.commandWorked(testColl.insert([{x: -1}, {x: 0}, {x: 1}]));
assert.commandWorked(testColl.createIndex({x: 1}));
// Set activation probability to less than 1 so that as long as there are retries,
// moveCollection will eventually succeed.
let activationProbability = 0.5;
let fp0 = configureFailPoint(st.rs0.getPrimary(),
"failFlushReshardingStateChange",
{errorCode: ErrorCodes.WriteConcernTimeout},
{activationProbability});
let fp1 = configureFailPoint(st.rs1.getPrimary(),
"failFlushReshardingStateChange",
{errorCode: ErrorCodes.WriteConcernTimeout},
{activationProbability});
const moveThread = new Thread(runMoveCollection, st.s.host, ns, st.shard1.shardName);
moveThread.start();
jsTest.log("Start waiting for moveCollection to finish");
assert.commandWorked(moveThread.returnData());
jsTest.log("Finished waiting for moveCollection to finish");
fp0.off();
fp1.off();
}
function testStopRetryingOnFailover(st, {enableCloneNoRefresh}) {
jsTest.log("Start testing that _flushReshardingStateChange stops retrying sharding metadata " +
"refresh on failover " + tojsononeline({enableCloneNoRefresh}));
// Set up the collection to reshard.
const dbName = "testDbStopRetrying";
const collName = "testColl";
const ns = dbName + '.' + collName;
const testColl = st.s.getCollection(ns);
assert.commandWorked(
st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
assert.commandWorked(testColl.insert([{x: -1}, {x: 0}, {x: 1}]));
assert.commandWorked(testColl.createIndex({x: 1}));
const primary0BeforeFailover = st.rs0.getPrimary();
const primary1BeforeFailover = st.rs1.getPrimary();
let fp0 = configureFailPoint(primary0BeforeFailover,
"failFlushReshardingStateChange",
{errorCode: ErrorCodes.WriteConcernTimeout});
let fp1 = configureFailPoint(primary1BeforeFailover,
"failFlushReshardingStateChange",
{errorCode: ErrorCodes.WriteConcernTimeout});
const moveThread = new Thread(runMoveCollection, st.s.host, ns, st.shard1.shardName);
moveThread.start();
jsTest.log(
"Waiting for _flushReshardingStateChange on shard0 to start retrying on refresh errors");
assertSoonFlushReshardingStateChangeStartRetryingOnRefreshErrors(primary0BeforeFailover);
jsTest.log(
"Waiting for _flushReshardingStateChange to shard1 to start retrying on refresh errors");
assertSoonFlushReshardingStateChangeStartRetryingOnRefreshErrors(primary1BeforeFailover);
jsTest.log("Triggering a failover on shard0");
stepUpNewPrimary(st.rs0);
const primary0AfterFailover = st.rs0.getPrimary();
jsTest.log("Triggering a failover on shard1");
stepUpNewPrimary(st.rs1);
const primary1AfterFailover = st.rs1.getPrimary();
jsTest.log("Checking that _flushReshardingStateChange retries eventually stop after failover");
assertSoonFlushReshardingStateChangeStopRetryingOnRefreshErrors(primary0BeforeFailover);
assertSoonFlushReshardingStateChangeStopRetryingOnRefreshErrors(primary1BeforeFailover);
jsTest.log("Start waiting for moveCollection to finish");
assert.commandWorked(moveThread.returnData());
jsTest.log("Finished waiting for moveCollection to finish");
assertFlushReshardingStateChangeMetricsNoRefreshErrors(primary0AfterFailover);
assertFlushReshardingStateChangeMetricsNoRefreshErrors(primary1AfterFailover);
fp0.off();
fp1.off();
}
function runTests({enableCloneNoRefresh}) {
jsTest.log("Start testing with " + tojsononeline({enableCloneNoRefresh}));
const st = new ShardingTest({
shards: 2,
rs: {
nodes: 3,
setParameter: {
featureFlagReshardingCloneNoRefresh: enableCloneNoRefresh,
}
},
other: {
configOptions: {
setParameter: {
featureFlagReshardingCloneNoRefresh: enableCloneNoRefresh,
}
}
}
});
testRetryOnTransientError(st, {enableCloneNoRefresh});
testStopRetryingOnFailover(st, {enableCloneNoRefresh});
st.stop();
}
runTests({enableCloneNoRefresh: false});
runTests({enableCloneNoRefresh: true});

View File

@ -38,9 +38,12 @@
#include "mongo/db/database_name.h" #include "mongo/db/database_name.h"
#include "mongo/db/namespace_string.h" #include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h" #include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/primary_only_service_helpers/with_automatic_retry.h"
#include "mongo/db/s/resharding/resharding_util.h" #include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/sharding_statistics.h"
#include "mongo/db/service_context.h" #include "mongo/db/service_context.h"
#include "mongo/executor/task_executor_pool.h" #include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h" #include "mongo/logv2/log.h"
@ -51,6 +54,7 @@
#include "mongo/util/assert_util.h" #include "mongo/util/assert_util.h"
#include "mongo/util/future.h" #include "mongo/util/future.h"
#include "mongo/util/future_impl.h" #include "mongo/util/future_impl.h"
#include "mongo/util/future_util.h"
#include "mongo/util/uuid.h" #include "mongo/util/uuid.h"
#include <memory> #include <memory>
@ -64,9 +68,31 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding
namespace mongo { namespace mongo {
namespace { namespace {
MONGO_FAIL_POINT_DEFINE(failFlushReshardingStateChange);
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
/**
* Returns true if _flushReshardingStateChange command should retry refreshing sharding metadata
* upon getting the given error.
*/
bool shouldRetryOnRefreshError(const Status& status) {
if (status == ErrorCodes::NamespaceNotFound) {
// The collection has been dropped.
return false;
}
// We need to retry on WriteConcernTimeout errors since doing a sharding metadata refresh
// involves performing a noop write with majority write concern with a timeout. We need to retry
// on snapshot errors since doing a sharding metadata refresh involves running an aggregation
// over the config.collections and config.chunks collections with snapshot read concern. The
// catalog cache does retry on snapshot errors but the number of retries is capped.
return primary_only_service_helpers::kDefaultRetryabilityPredicate(status) ||
status == ErrorCodes::WriteConcernTimeout || status.isA<ErrorCategory::SnapshotError>();
}
class FlushReshardingStateChangeCmd final : public TypedCommand<FlushReshardingStateChangeCmd> { class FlushReshardingStateChangeCmd final : public TypedCommand<FlushReshardingStateChangeCmd> {
public: public:
using Request = _flushReshardingStateChange; using Request = _flushReshardingStateChange;
@ -130,21 +156,71 @@ public:
// cause potential liveness issues since the arbitrary executor is a NetworkInterfaceTL // cause potential liveness issues since the arbitrary executor is a NetworkInterfaceTL
// executor in sharded clusters and that executor is one that executes networking // executor in sharded clusters and that executor is one that executes networking
// operations. // operations.
ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()) AsyncTry([svcCtx = opCtx->getServiceContext(), nss = ns(), numTries = 0]() mutable {
.then([svcCtx = opCtx->getServiceContext(), nss = ns()] { ThreadClient tc("FlushReshardingStateChange",
ThreadClient tc("FlushReshardingStateChange", svcCtx->getService(ClusterRole::ShardServer));
svcCtx->getService(ClusterRole::ShardServer)); auto opCtx = tc->makeOperationContext();
auto opCtx = tc->makeOperationContext();
uassertStatusOK( auto replCoord = repl::ReplicationCoordinator::get(opCtx.get());
FilteringMetadataCache::get(opCtx.get()) if (!replCoord->getMemberState().primary()) {
->onCollectionPlacementVersionMismatch( LOGV2(10795200,
opCtx.get(), nss, boost::none /* chunkVersionReceived */)); "Stop refreshing sharding metadata in _flushReshardingStateChange since "
}) "this node is no longer a primary",
.onError([](const Status& status) { "numTries"_attr = numTries);
LOGV2_WARNING(5808100, return;
"Error on deferred _flushReshardingStateChange execution", }
"error"_attr = redact(status));
numTries++;
LOGV2_DEBUG(10795201,
1,
"Start refreshing sharding metadata in _flushReshardingStateChange",
"numTries"_attr = numTries);
auto& shardingStatistics = ShardingStatistics::get(opCtx.get());
shardingStatistics.countFlushReshardingStateChangeTotalShardingMetadataRefreshes
.addAndFetch(1);
boost::optional<Status> mockStatus;
failFlushReshardingStateChange.execute([&](const BSONObj& data) {
const auto& errorCode = data.getIntField("errorCode");
mockStatus =
Status(ErrorCodes::Error(errorCode),
"Failing refresh in _flushReshardingStateChange due to failpoint");
});
auto refreshStatus = mockStatus
? *mockStatus
: FilteringMetadataCache::get(opCtx.get())
->onCollectionPlacementVersionMismatch(
opCtx.get(), nss, boost::none /* chunkVersionReceived */);
if (refreshStatus.isOK()) {
shardingStatistics
.countFlushReshardingStateChangeSuccessfulShardingMetadataRefreshes
.addAndFetch(1);
} else {
shardingStatistics
.countFlushReshardingStateChangeFailedShardingMetadataRefreshes.addAndFetch(
1);
}
uassertStatusOK(refreshStatus);
LOGV2_DEBUG(10795202,
1,
"Finished refreshing sharding metadata in _flushReshardingStateChange",
"numTries"_attr = numTries);
})
.until([](Status status) {
if (!status.isOK()) {
LOGV2_WARNING(5808100,
"Error on deferred _flushReshardingStateChange execution",
"error"_attr = redact(status));
}
return status.isOK() || !shouldRetryOnRefreshError(status);
}) })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
CancellationToken::uncancelable())
.getAsync([](auto) {}); .getAsync([](auto) {});
// Ensure the command isn't run on a stale primary. // Ensure the command isn't run on a stale primary.

View File

@ -90,6 +90,13 @@ void ShardingStatistics::report(BSONObjBuilder* builder) const {
countTransitionToDedicatedConfigServerCompleted.loadRelaxed()); countTransitionToDedicatedConfigServerCompleted.loadRelaxed());
builder->append("countTransitionFromDedicatedConfigServerCompleted", builder->append("countTransitionFromDedicatedConfigServerCompleted",
countTransitionFromDedicatedConfigServerCompleted.loadRelaxed()); countTransitionFromDedicatedConfigServerCompleted.loadRelaxed());
builder->append("countFlushReshardingStateChangeTotalShardingMetadataRefreshes",
countFlushReshardingStateChangeTotalShardingMetadataRefreshes.loadRelaxed());
builder->append(
"countFlushReshardingStateChangeSuccessfulShardingMetadataRefreshes",
countFlushReshardingStateChangeSuccessfulShardingMetadataRefreshes.loadRelaxed());
builder->append("countFlushReshardingStateChangeFailedShardingMetadataRefreshes",
countFlushReshardingStateChangeFailedShardingMetadataRefreshes.loadRelaxed());
} }
} // namespace mongo } // namespace mongo

View File

@ -144,6 +144,14 @@ struct ShardingStatistics {
// completed. // completed.
AtomicWord<long long> countTransitionFromDedicatedConfigServerCompleted{0}; AtomicWord<long long> countTransitionFromDedicatedConfigServerCompleted{0};
// Cumulative, always-increasing total number of sharding metadata refreshes that have been
// kicked off by the _flushReshardingStateChange command.
AtomicWord<long long> countFlushReshardingStateChangeTotalShardingMetadataRefreshes{0};
// Cumulative, always-increasing number of successful and failed sharding metadata refreshes
// that have been kicked off by the _flushReshardingStateChange command.
AtomicWord<long long> countFlushReshardingStateChangeSuccessfulShardingMetadataRefreshes{0};
AtomicWord<long long> countFlushReshardingStateChangeFailedShardingMetadataRefreshes{0};
/** /**
* Obtains the per-process instance of the sharding statistics object. * Obtains the per-process instance of the sharding statistics object.
*/ */