mirror of https://github.com/mongodb/mongo
SERVER-101421 Expedite abort of recipient when aborting chunk migration (#44276)
GitOrigin-RevId: a2ba962be10ae64f5c94aa8fdb8d12fe1d2f093a
This commit is contained in:
parent
235842abe2
commit
63469c57b6
|
|
@ -91,6 +91,8 @@ last-continuous:
|
|||
ticket: SERVER-113145
|
||||
- test_file: jstests/aggregation/accumulators/accumulator_js_size_limits.js
|
||||
ticket: SERVER-112267
|
||||
- test_file: jstests/sharding/move_chunk_abort.js
|
||||
ticket: SERVER-101421
|
||||
- test_file: jstests/sharding/resharding_error_during_critical_section.js
|
||||
ticket: SERVER-114005
|
||||
- test_file: jstests/sharding/move_collection_stale_mongos.js
|
||||
|
|
@ -702,6 +704,8 @@ last-lts:
|
|||
ticket: SERVER-112267
|
||||
- test_file: jstests/concurrency/fsm_workloads/timeseries/timeseries_create_indexes.js
|
||||
ticket: SERVER-113887
|
||||
- test_file: jstests/sharding/move_chunk_abort.js
|
||||
ticket: SERVER-101421
|
||||
- test_file: jstests/sharding/resharding_error_during_critical_section.js
|
||||
ticket: SERVER-114005
|
||||
- test_file: jstests/sharding/move_collection_stale_mongos.js
|
||||
|
|
|
|||
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Ensure that moveChunk is expediently aborted if it fails.
|
||||
*
|
||||
* @tags: [requires_persistence]
|
||||
*/
|
||||
|
||||
import {moveChunkParallel} from "jstests/libs/chunk_manipulation_util.js";
|
||||
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js";
|
||||
|
||||
const dbName = "test";
|
||||
const collName = "user";
|
||||
const staticMongod = MongoRunner.runMongod({});
|
||||
const st = new ShardingTest({shards: {rs0: {nodes: 2}, rs1: {nodes: 1}}});
|
||||
const collection = st.s.getDB(dbName).getCollection(collName);
|
||||
|
||||
CreateShardedCollectionUtil.shardCollectionWithChunks(collection, {_id: 1}, [
|
||||
{min: {_id: MinKey}, max: {_id: 10}, shard: st.shard0.shardName},
|
||||
{min: {_id: 10}, max: {_id: MaxKey}, shard: st.shard1.shardName},
|
||||
]);
|
||||
|
||||
for (let i = 0; i < 20; i++) {
|
||||
assert.commandWorked(collection.insertOne({_id: i, x: i}));
|
||||
}
|
||||
|
||||
// The recipient will be stuck running transferMods, it won't know the chunkMigration failed unless it's notified
|
||||
// by the donor and the migrateThread is interrupted.
|
||||
const transferModsFp = configureFailPoint(st.rs0.getPrimary(), "hangBeforeRunningXferMods");
|
||||
|
||||
const joinMoveChunk = moveChunkParallel(
|
||||
staticMongod,
|
||||
st.s.host,
|
||||
{_id: 1},
|
||||
null,
|
||||
dbName + "." + collName,
|
||||
st.shard1.shardName,
|
||||
false /*expectSuccess*/,
|
||||
);
|
||||
|
||||
transferModsFp.wait();
|
||||
|
||||
// Perform a collMod directly on the shard's primary to cause the moveChunk to abort.
|
||||
assert.commandWorked(
|
||||
st.rs0
|
||||
.getPrimary()
|
||||
.getDB(dbName)
|
||||
.runCommand({collMod: collName, validationLevel: "off", writeConcern: {w: 1}}),
|
||||
);
|
||||
|
||||
joinMoveChunk();
|
||||
|
||||
collection.drop();
|
||||
|
||||
st.stop();
|
||||
|
||||
MongoRunner.stopMongod(staticMongod);
|
||||
|
|
@ -70,6 +70,7 @@
|
|||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
|
||||
|
||||
MONGO_FAIL_POINT_DEFINE(hangBeforeRunningXferMods);
|
||||
|
||||
/**
|
||||
* This file contains commands, which are specific to the legacy chunk cloner source.
|
||||
|
|
@ -272,6 +273,9 @@ public:
|
|||
const DatabaseName&,
|
||||
const BSONObj& cmdObj,
|
||||
BSONObjBuilder& result) override {
|
||||
|
||||
hangBeforeRunningXferMods.pauseWhileSet();
|
||||
|
||||
const MigrationSessionId migrationSessionId(
|
||||
uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
|
||||
|
||||
|
|
|
|||
|
|
@ -201,6 +201,13 @@ boost::optional<SharedSemiFuture<void>> MigrationCoordinator::completeMigration(
|
|||
"migrationId"_attr = _migrationInfo.getId(),
|
||||
logAttrs(_migrationInfo.getNss()));
|
||||
|
||||
if (*decision == DecisionEnum::kAborted) {
|
||||
abortMigrationRecipient(opCtx,
|
||||
_migrationInfo.getRecipientShardId(),
|
||||
_migrationInfo.getNss(),
|
||||
_migrationInfo.getMigrationSessionId());
|
||||
}
|
||||
|
||||
if (!_releaseRecipientCriticalSectionFuture) {
|
||||
launchReleaseRecipientCriticalSection(opCtx);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -808,6 +808,18 @@ Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
|
|||
<< _sessionId->toString()};
|
||||
}
|
||||
|
||||
// Only interrupt the migrate thread if we haven't entered the critical section, otherwise we'll
|
||||
// never exit it.
|
||||
if (_state < kEnteredCritSec) {
|
||||
LOGV2(10142101,
|
||||
"Interrupting migrateThread",
|
||||
logAttrs(_nss),
|
||||
"migrationId"_attr = _migrationId->toBSON());
|
||||
_cancellationSource.cancel();
|
||||
auto newCancellationSource = CancellationSource();
|
||||
std::swap(_cancellationSource, newCancellationSource);
|
||||
}
|
||||
|
||||
_state = kAbort;
|
||||
_stateChangedCV.notify_all();
|
||||
_errmsg = "aborted";
|
||||
|
|
|
|||
|
|
@ -620,6 +620,41 @@ void drainMigrationsOnFcvDowngrade(OperationContext* opCtx) {
|
|||
});
|
||||
}
|
||||
|
||||
void abortMigrationRecipient(OperationContext* opCtx,
|
||||
const ShardId& recipientShardId,
|
||||
const NamespaceString& nss,
|
||||
const MigrationSessionId& sessionId) {
|
||||
const auto logFailure = [](const Status& status) {
|
||||
// The best effort attempt can fail for a variety of reasons, e.g. another migration is
|
||||
// already in progress or the shard can't be found.
|
||||
LOGV2(10142100,
|
||||
"Best effort attempt to abort migration recipient failed",
|
||||
"reason"_attr = status.reason());
|
||||
};
|
||||
const auto recipientShard =
|
||||
Grid::get(opCtx)->shardRegistry()->getShard(opCtx, recipientShardId);
|
||||
if (!recipientShard.isOK()) {
|
||||
logFailure(recipientShard.getStatus());
|
||||
return;
|
||||
}
|
||||
|
||||
BSONObjBuilder bob;
|
||||
bob.append("_recvChunkAbort",
|
||||
NamespaceStringUtil::serialize(nss, SerializationContext::stateDefault()));
|
||||
sessionId.append(&bob);
|
||||
const auto cmdObj = bob.obj();
|
||||
|
||||
const auto response =
|
||||
recipientShard.getValue()->runCommand(opCtx,
|
||||
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
|
||||
DatabaseName::kAdmin,
|
||||
cmdObj,
|
||||
Shard::RetryPolicy::kIdempotent);
|
||||
auto status = Shard::CommandResponse::getEffectiveStatus(response);
|
||||
if (!status.isOK()) {
|
||||
logFailure(status);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace migrationutil
|
||||
} // namespace mongo
|
||||
|
|
|
|||
|
|
@ -195,5 +195,13 @@ SemiFuture<void> asyncRecoverMigrationUntilSuccessOrStepDown(OperationContext* o
|
|||
*/
|
||||
MONGO_MOD_PUBLIC void drainMigrationsOnFcvDowngrade(OperationContext* opCtx);
|
||||
|
||||
/**
|
||||
* Best effort attempt at aborting migration on the recipient.
|
||||
*/
|
||||
void abortMigrationRecipient(OperationContext* opCtx,
|
||||
const ShardId& recipientShardId,
|
||||
const NamespaceString& nss,
|
||||
const MigrationSessionId& sessionId);
|
||||
|
||||
} // namespace migrationutil
|
||||
} // namespace mongo
|
||||
|
|
|
|||
Loading…
Reference in New Issue