mirror of https://github.com/mongodb/mongo
SERVER-27534 All writing operations must fail if the term changes.
The description of this SERVER ticket describes a nasty race that can occur if elections happen inbetween two batches during a large update. (The included test confirms that the race is possible.) To fix this, we want to check the operation context for interrupts with each batch, and we need to make sure the check happens _after_ the collection lock gets taken and before the batch inserts/updates/deletes execute. A recent change to locking gives us almost exactly this for free: if a collection lock has to wait, it throws an exception when the operation context is interrupted, ending the operation. If the lock doesn't wait, though, there is no check. This patch adds that check in. Acquiring a lock now always throws if the operation context is interrupted, which closes the race window in this bug.
This commit is contained in:
parent
c6620182ae
commit
bc19d43fdc
|
|
@ -11,6 +11,8 @@ selector:
|
|||
exclude_files:
|
||||
# Skip any tests that run with auth explicitly.
|
||||
- jstests/replsets/*[aA]uth*.js
|
||||
# Also skip tests that require a ScopedThread, because ScopedThreads don't inherit credentials.
|
||||
- jstests/replsets/interrupted_batch_insert.js
|
||||
|
||||
executor:
|
||||
config:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,126 @@
|
|||
// Tests the scenario described in SERVER-2753.
|
||||
// 1. Send a single insert command with a large number of documents and the {ordered: true} option.
|
||||
// 2. Force the thread processing the insert command to hang inbetween insert batches. (Inserts are
|
||||
// typically split into batches of 64, and the server yields locks between batches.)
|
||||
// 3. Disconnect the original primary from the network, forcing another node to step up.
|
||||
// 4. Insert a single document on the new primary.
|
||||
// 5. Return the original primary to the network and force it to step up by disconnecting the
|
||||
// primary that replaced it. The original primary has to roll back any batches from step 1
|
||||
// that were inserted locally but did not get majority committed before the insert in step 4.
|
||||
// 6. Unpause the thread performing the insert from step 1. If it continues to
|
||||
// insert batches even though there was a rollback, those inserts will
|
||||
// violate the {ordered: true} option.
|
||||
|
||||
load('jstests/libs/parallelTester.js');
|
||||
load("jstests/replsets/rslib.js");
|
||||
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
var name = "interrupted_batch_insert";
|
||||
var replTest = new ReplSetTest({name: name, nodes: 3, useBridge: true});
|
||||
var nodes = replTest.nodeList();
|
||||
|
||||
var conns = replTest.startSet();
|
||||
replTest.initiate({
|
||||
_id: name,
|
||||
members: [
|
||||
{_id: 0, host: nodes[0]},
|
||||
{_id: 1, host: nodes[1]},
|
||||
{_id: 2, host: nodes[2], priority: 0}
|
||||
]
|
||||
});
|
||||
|
||||
// The test starts with node 0 as the primary.
|
||||
replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY);
|
||||
var primary = replTest.nodes[0];
|
||||
var collName = primary.getDB("db")[name].getFullName();
|
||||
|
||||
var getParameterResult =
|
||||
primary.getDB("admin").runCommand({getParameter: 1, internalInsertMaxBatchSize: 1});
|
||||
assert.commandWorked(getParameterResult);
|
||||
const batchSize = getParameterResult.internalInsertMaxBatchSize;
|
||||
|
||||
// Prevent node 1 from getting any data from the node 0 oplog.
|
||||
conns[0].disconnect(conns[1]);
|
||||
|
||||
// Allow the primary to insert the first 5 batches of documents. After that, the fail point
|
||||
// activates, and the client thread hangs until the fail point gets turned off.
|
||||
assert.commandWorked(primary.getDB("db").adminCommand(
|
||||
{configureFailPoint: "hangDuringBatchInsert", mode: {skip: 5}}));
|
||||
|
||||
// In a background thread, issue an insert command to the primary that will insert 10 batches of
|
||||
// documents.
|
||||
var worker = new ScopedThread((host, collName, numToInsert) => {
|
||||
// Insert elements [{idx: 0}, {idx: 1}, ..., {idx: numToInsert - 1}].
|
||||
const docsToInsert = Array.from({length: numToInsert}, (_, i) => {
|
||||
return {idx: i};
|
||||
});
|
||||
var coll = new Mongo(host).getCollection(collName);
|
||||
assert.throws(
|
||||
() => coll.insert(docsToInsert,
|
||||
{writeConcern: {w: "majority", wtimeout: 5000}, ordered: true}),
|
||||
[],
|
||||
"network error");
|
||||
}, primary.host, collName, 10 * batchSize);
|
||||
worker.start();
|
||||
|
||||
// Wait long enough to guarantee that all 5 batches of inserts have executed and the primary is
|
||||
// hung on the "hangDuringBatchInsert" fail point.
|
||||
sleep(1000);
|
||||
|
||||
// Make sure the insert command is, in fact, running in the background.
|
||||
assert.eq(primary.getDB("db").currentOp({"command.insert": name, active: true}).inprog.length,
|
||||
1);
|
||||
|
||||
// Completely isolate the current primary (node 0), forcing it to step down.
|
||||
conns[0].disconnect(conns[2]);
|
||||
|
||||
// Wait for node 1, the only other eligible node, to become the new primary.
|
||||
replTest.waitForState(replTest.nodes[1], ReplSetTest.State.PRIMARY);
|
||||
|
||||
// Wait for node 2 to acknowledge node 1 as the new primary.
|
||||
replTest.awaitSyncSource(replTest.nodes[2], replTest.nodes[1]);
|
||||
|
||||
// Issue a write to the new primary.
|
||||
var collOnNewPrimary = replTest.nodes[1].getCollection(collName);
|
||||
assert.writeOK(collOnNewPrimary.insert({singleDoc: 1}, {writeConcern: {w: "majority"}}));
|
||||
|
||||
// Isolate node 1, forcing it to step down as primary, and reconnect node 0, allowing it to step
|
||||
// up again.
|
||||
conns[0].reconnect(conns[2]);
|
||||
conns[1].disconnect(conns[2]);
|
||||
|
||||
// Wait for node 0 to become primary again.
|
||||
replTest.waitForState(primary, ReplSetTest.State.PRIMARY);
|
||||
|
||||
// Wait until node 2 recognizes node 0 as primary.
|
||||
replTest.awaitSyncSource(replTest.nodes[2], primary);
|
||||
|
||||
// Allow the batch insert to continue.
|
||||
assert.commandWorked(primary.getDB("db").adminCommand(
|
||||
{configureFailPoint: "hangDuringBatchInsert", mode: "off"}));
|
||||
|
||||
// Wait until the insert command is done.
|
||||
assert.soon(
|
||||
() =>
|
||||
primary.getDB("db").currentOp({"command.insert": name, active: true}).inprog.length ===
|
||||
0);
|
||||
|
||||
worker.join();
|
||||
|
||||
var docs = primary.getDB("db")[name].find({idx: {$exists: 1}}).sort({idx: 1}).toArray();
|
||||
|
||||
// Any discontinuity in the "idx" values is an error. If an "idx" document failed to insert, all
|
||||
// the of "idx" documents after it should also have failed to insert, because the insert
|
||||
// specified {ordered: 1}. Note, if none of the inserts were successful, that's fine.
|
||||
docs.forEach((element, index) => {
|
||||
assert.eq(element.idx, index);
|
||||
});
|
||||
|
||||
// Reconnect the remaining disconnected nodes, so we can exit.
|
||||
conns[0].reconnect(conns[1]);
|
||||
conns[1].reconnect(conns[2]);
|
||||
|
||||
replTest.stopSet(15);
|
||||
}());
|
||||
|
|
@ -140,6 +140,7 @@ void shutdown(ServiceContext* srvContext) {
|
|||
// Close all open databases, shutdown storage engine and run all deinitializers.
|
||||
auto shutdownOpCtx = serviceContext->makeOperationContext(client);
|
||||
{
|
||||
UninterruptibleLockGuard noInterrupt(shutdownOpCtx->lockState());
|
||||
Lock::GlobalLock lk(shutdownOpCtx.get(), MODE_X, Date_t::max());
|
||||
dbHolder().closeAll(shutdownOpCtx.get(), "shutdown");
|
||||
|
||||
|
|
|
|||
|
|
@ -332,6 +332,9 @@ Status renameCollectionCommon(OperationContext* opCtx,
|
|||
|
||||
// Dismissed on success
|
||||
auto tmpCollectionDropper = MakeGuard([&] {
|
||||
// Ensure that we don't trigger an exception when attempting to take locks.
|
||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
||||
|
||||
BSONObjBuilder unusedResult;
|
||||
auto status =
|
||||
dropCollection(opCtx,
|
||||
|
|
|
|||
|
|
@ -294,6 +294,9 @@ public:
|
|||
|
||||
// Only used by the failpoints.
|
||||
const auto dropAndReaquireReadLock = [&readLock, opCtx, &request]() {
|
||||
// Make sure an interrupted operation does not prevent us from reacquiring the lock.
|
||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
||||
|
||||
readLock.reset();
|
||||
readLock.emplace(opCtx, request.nss);
|
||||
};
|
||||
|
|
|
|||
|
|
@ -1176,6 +1176,80 @@ TEST_F(DConcurrencyTestFixture, TicketReacquireCanBeInterrupted) {
|
|||
ASSERT_THROWS_CODE(result.get(), AssertionException, ErrorCodes::Interrupted);
|
||||
}
|
||||
|
||||
TEST_F(DConcurrencyTestFixture, GlobalLockInInterruptedContextThrowsEvenWhenUncontested) {
|
||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
||||
auto opCtx = clients[0].second.get();
|
||||
|
||||
opCtx->markKilled();
|
||||
|
||||
boost::optional<Lock::GlobalRead> globalReadLock;
|
||||
ASSERT_THROWS_CODE(
|
||||
globalReadLock.emplace(opCtx, Date_t::now()), AssertionException, ErrorCodes::Interrupted);
|
||||
}
|
||||
|
||||
TEST_F(DConcurrencyTestFixture, GlobalLockInInterruptedContextThrowsEvenAcquiringRecursively) {
|
||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
||||
auto opCtx = clients[0].second.get();
|
||||
|
||||
Lock::GlobalWrite globalWriteLock(opCtx, Date_t::now());
|
||||
|
||||
opCtx->markKilled();
|
||||
|
||||
{
|
||||
boost::optional<Lock::GlobalWrite> recursiveGlobalWriteLock;
|
||||
ASSERT_THROWS_CODE(recursiveGlobalWriteLock.emplace(opCtx, Date_t::now()),
|
||||
AssertionException,
|
||||
ErrorCodes::Interrupted);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DConcurrencyTestFixture, GlobalLockInInterruptedContextRespectsUninterruptibleGuard) {
|
||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
||||
auto opCtx = clients[0].second.get();
|
||||
|
||||
opCtx->markKilled();
|
||||
|
||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
||||
Lock::GlobalRead globalReadLock(opCtx, Date_t::now()); // Does not throw.
|
||||
}
|
||||
|
||||
TEST_F(DConcurrencyTestFixture, DBLockInInterruptedContextThrowsEvenWhenUncontested) {
|
||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
||||
auto opCtx = clients[0].second.get();
|
||||
|
||||
opCtx->markKilled();
|
||||
|
||||
boost::optional<Lock::DBLock> dbWriteLock;
|
||||
ASSERT_THROWS_CODE(
|
||||
dbWriteLock.emplace(opCtx, "db", MODE_IX), AssertionException, ErrorCodes::Interrupted);
|
||||
}
|
||||
|
||||
TEST_F(DConcurrencyTestFixture, DBLockInInterruptedContextThrowsEvenWhenAcquiringRecursively) {
|
||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
||||
auto opCtx = clients[0].second.get();
|
||||
|
||||
Lock::DBLock dbWriteLock(opCtx, "db", MODE_X);
|
||||
|
||||
opCtx->markKilled();
|
||||
|
||||
{
|
||||
boost::optional<Lock::DBLock> recursiveDBWriteLock;
|
||||
ASSERT_THROWS_CODE(recursiveDBWriteLock.emplace(opCtx, "db", MODE_X),
|
||||
AssertionException,
|
||||
ErrorCodes::Interrupted);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DConcurrencyTestFixture, DBLockInInterruptedContextRespectsUninterruptibleGuard) {
|
||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
||||
auto opCtx = clients[0].second.get();
|
||||
|
||||
opCtx->markKilled();
|
||||
|
||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
||||
Lock::DBLock dbWriteLock(opCtx, "db", MODE_X); // Does not throw.
|
||||
}
|
||||
|
||||
TEST_F(DConcurrencyTestFixture, DBLockTimeout) {
|
||||
auto clientOpctxPairs = makeKClientsWithLockers<DefaultLockerImpl>(2);
|
||||
auto opctx1 = clientOpctxPairs[0].second.get();
|
||||
|
|
|
|||
|
|
@ -37,8 +37,8 @@ TEST(Deadlock, NoDeadlock) {
|
|||
LockerForTests locker1(MODE_IS);
|
||||
LockerForTests locker2(MODE_IS);
|
||||
|
||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(resId, MODE_S));
|
||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(resId, MODE_S));
|
||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(nullptr, resId, MODE_S));
|
||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(nullptr, resId, MODE_S));
|
||||
|
||||
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
||||
ASSERT(!wfg1.check().hasCycle());
|
||||
|
|
@ -54,14 +54,14 @@ TEST(Deadlock, Simple) {
|
|||
LockerForTests locker1(MODE_IX);
|
||||
LockerForTests locker2(MODE_IX);
|
||||
|
||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(resIdA, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(resIdB, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(nullptr, resIdA, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(nullptr, resIdB, MODE_X));
|
||||
|
||||
// 1 -> 2
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(resIdB, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(nullptr, resIdB, MODE_X));
|
||||
|
||||
// 2 -> 1
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(resIdA, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(nullptr, resIdA, MODE_X));
|
||||
|
||||
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
||||
ASSERT(wfg1.check().hasCycle());
|
||||
|
|
@ -81,12 +81,12 @@ TEST(Deadlock, SimpleUpgrade) {
|
|||
LockerForTests locker2(MODE_IX);
|
||||
|
||||
// Both acquire lock in intent mode
|
||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(resId, MODE_IX));
|
||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(resId, MODE_IX));
|
||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(nullptr, resId, MODE_IX));
|
||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(nullptr, resId, MODE_IX));
|
||||
|
||||
// Both try to upgrade
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(resId, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(resId, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(nullptr, resId, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(nullptr, resId, MODE_X));
|
||||
|
||||
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
||||
ASSERT(wfg1.check().hasCycle());
|
||||
|
|
@ -107,17 +107,17 @@ TEST(Deadlock, Indirect) {
|
|||
LockerForTests locker2(MODE_IX);
|
||||
LockerForTests lockerIndirect(MODE_IX);
|
||||
|
||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(resIdA, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(resIdB, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(nullptr, resIdA, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(nullptr, resIdB, MODE_X));
|
||||
|
||||
// 1 -> 2
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(resIdB, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(nullptr, resIdB, MODE_X));
|
||||
|
||||
// 2 -> 1
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(resIdA, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(nullptr, resIdA, MODE_X));
|
||||
|
||||
// 3 -> 2
|
||||
ASSERT_EQUALS(LOCK_WAITING, lockerIndirect.lockBegin(resIdA, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_WAITING, lockerIndirect.lockBegin(nullptr, resIdA, MODE_X));
|
||||
|
||||
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
||||
ASSERT(wfg1.check().hasCycle());
|
||||
|
|
@ -143,17 +143,17 @@ TEST(Deadlock, IndirectWithUpgrade) {
|
|||
LockerForTests writer(MODE_IX);
|
||||
|
||||
// This sequence simulates the deadlock which occurs during flush
|
||||
ASSERT_EQUALS(LOCK_OK, writer.lockBegin(resIdFlush, MODE_IX));
|
||||
ASSERT_EQUALS(LOCK_OK, writer.lockBegin(resIdDb, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_OK, writer.lockBegin(nullptr, resIdFlush, MODE_IX));
|
||||
ASSERT_EQUALS(LOCK_OK, writer.lockBegin(nullptr, resIdDb, MODE_X));
|
||||
|
||||
ASSERT_EQUALS(LOCK_OK, reader.lockBegin(resIdFlush, MODE_IS));
|
||||
ASSERT_EQUALS(LOCK_OK, reader.lockBegin(nullptr, resIdFlush, MODE_IS));
|
||||
|
||||
// R -> W
|
||||
ASSERT_EQUALS(LOCK_WAITING, reader.lockBegin(resIdDb, MODE_S));
|
||||
ASSERT_EQUALS(LOCK_WAITING, reader.lockBegin(nullptr, resIdDb, MODE_S));
|
||||
|
||||
// R -> W
|
||||
// F -> W
|
||||
ASSERT_EQUALS(LOCK_WAITING, flush.lockBegin(resIdFlush, MODE_S));
|
||||
ASSERT_EQUALS(LOCK_WAITING, flush.lockBegin(nullptr, resIdFlush, MODE_S));
|
||||
|
||||
// W yields its flush lock, so now f is granted in mode S
|
||||
//
|
||||
|
|
@ -164,14 +164,14 @@ TEST(Deadlock, IndirectWithUpgrade) {
|
|||
//
|
||||
// R -> W
|
||||
// F -> R
|
||||
ASSERT_EQUALS(LOCK_WAITING, flush.lockBegin(resIdFlush, MODE_X));
|
||||
ASSERT_EQUALS(LOCK_WAITING, flush.lockBegin(nullptr, resIdFlush, MODE_X));
|
||||
|
||||
// W comes back from the commit and tries to re-acquire the flush lock
|
||||
//
|
||||
// R -> W
|
||||
// F -> R
|
||||
// W -> F
|
||||
ASSERT_EQUALS(LOCK_WAITING, writer.lockBegin(resIdFlush, MODE_IX));
|
||||
ASSERT_EQUALS(LOCK_WAITING, writer.lockBegin(nullptr, resIdFlush, MODE_IX));
|
||||
|
||||
// Run deadlock detection from the point of view of each of the involved lockers
|
||||
DeadlockDetector wfgF(*getGlobalLockManager(), &flush);
|
||||
|
|
|
|||
|
|
@ -347,7 +347,7 @@ LockResult LockerImpl<IsForMMAPV1>::_lockGlobalBegin(OperationContext* opCtx,
|
|||
}
|
||||
_modeForTicket = mode;
|
||||
}
|
||||
const LockResult result = lockBegin(resourceIdGlobal, mode);
|
||||
const LockResult result = lockBegin(opCtx, resourceIdGlobal, mode);
|
||||
if (result == LOCK_OK)
|
||||
return LOCK_OK;
|
||||
|
||||
|
|
@ -469,7 +469,7 @@ template <bool IsForMMAPV1>
|
|||
LockResult LockerImpl<IsForMMAPV1>::lock(
|
||||
OperationContext* opCtx, ResourceId resId, LockMode mode, Date_t deadline, bool checkDeadlock) {
|
||||
|
||||
const LockResult result = lockBegin(resId, mode);
|
||||
const LockResult result = lockBegin(opCtx, resId, mode);
|
||||
|
||||
// Fast, uncontended path
|
||||
if (result == LOCK_OK)
|
||||
|
|
@ -713,7 +713,9 @@ void LockerImpl<IsForMMAPV1>::restoreLockState(OperationContext* opCtx,
|
|||
}
|
||||
|
||||
template <bool IsForMMAPV1>
|
||||
LockResult LockerImpl<IsForMMAPV1>::lockBegin(ResourceId resId, LockMode mode) {
|
||||
LockResult LockerImpl<IsForMMAPV1>::lockBegin(OperationContext* opCtx,
|
||||
ResourceId resId,
|
||||
LockMode mode) {
|
||||
dassert(!getWaitingResource().isValid());
|
||||
|
||||
LockRequest* request;
|
||||
|
|
@ -778,6 +780,16 @@ LockResult LockerImpl<IsForMMAPV1>::lockBegin(ResourceId resId, LockMode mode) {
|
|||
if (result == LOCK_WAITING) {
|
||||
globalStats.recordWait(_id, resId, mode);
|
||||
_stats.recordWait(resId, mode);
|
||||
} else if (result == LOCK_OK && opCtx && _uninterruptibleLocksRequested == 0) {
|
||||
// Lock acquisitions are not allowed to succeed when opCtx is marked as interrupted, unless
|
||||
// the caller requested an uninterruptible lock.
|
||||
auto interruptStatus = opCtx->checkForInterruptNoAssert();
|
||||
if (!interruptStatus.isOK()) {
|
||||
auto unlockIt = _requests.find(resId);
|
||||
invariant(unlockIt);
|
||||
_unlockImpl(&unlockIt);
|
||||
uassertStatusOK(interruptStatus);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
|
|||
|
|
@ -202,10 +202,14 @@ public:
|
|||
* In other words for each call to lockBegin, which does not return LOCK_OK, there needs to
|
||||
* be a corresponding call to either lockComplete or unlock.
|
||||
*
|
||||
* If an operation context is provided that represents an interrupted operation, lockBegin will
|
||||
* throw an exception whenever it would have been possible to grant the lock with LOCK_OK. This
|
||||
* behavior can be disabled with an UninterruptibleLockGuard.
|
||||
*
|
||||
* NOTE: These methods are not public and should only be used inside the class
|
||||
* implementation and for unit-tests and not called directly.
|
||||
*/
|
||||
LockResult lockBegin(ResourceId resId, LockMode mode);
|
||||
LockResult lockBegin(OperationContext* opCtx, ResourceId resId, LockMode mode);
|
||||
|
||||
/**
|
||||
* Waits for the completion of a lock, previously requested through lockBegin or
|
||||
|
|
|
|||
|
|
@ -268,12 +268,12 @@ TEST(LockerImpl, CanceledDeadlockUnblocks) {
|
|||
ASSERT(LOCK_OK == locker2.lock(db2, MODE_X));
|
||||
|
||||
// Set up locker1 and locker2 for deadlock
|
||||
ASSERT(LOCK_WAITING == locker1.lockBegin(db2, MODE_X));
|
||||
ASSERT(LOCK_WAITING == locker2.lockBegin(db1, MODE_X));
|
||||
ASSERT(LOCK_WAITING == locker1.lockBegin(nullptr, db2, MODE_X));
|
||||
ASSERT(LOCK_WAITING == locker2.lockBegin(nullptr, db1, MODE_X));
|
||||
|
||||
// Locker3 blocks behind locker 2
|
||||
ASSERT(LOCK_OK == locker3.lockGlobal(MODE_IX));
|
||||
ASSERT(LOCK_WAITING == locker3.lockBegin(db1, MODE_S));
|
||||
ASSERT(LOCK_WAITING == locker3.lockBegin(nullptr, db1, MODE_S));
|
||||
|
||||
// Detect deadlock, canceling our request
|
||||
ASSERT(
|
||||
|
|
@ -442,7 +442,7 @@ TEST(LockerImpl, GetLockerInfoShouldReportPendingLocks) {
|
|||
DefaultLockerImpl conflictingLocker;
|
||||
ASSERT_EQ(LOCK_OK, conflictingLocker.lockGlobal(MODE_IS));
|
||||
ASSERT_EQ(LOCK_OK, conflictingLocker.lock(dbId, MODE_IS));
|
||||
ASSERT_EQ(LOCK_WAITING, conflictingLocker.lockBegin(collectionId, MODE_IS));
|
||||
ASSERT_EQ(LOCK_WAITING, conflictingLocker.lockBegin(nullptr, collectionId, MODE_IS));
|
||||
|
||||
// Assert the held locks show up in the output of getLockerInfo().
|
||||
Locker::LockerInfo lockerInfo;
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ TEST(LockStats, Wait) {
|
|||
{
|
||||
// This will block
|
||||
LockerForTests lockerConflict(MODE_IX);
|
||||
ASSERT_EQUALS(LOCK_WAITING, lockerConflict.lockBegin(resId, MODE_S));
|
||||
ASSERT_EQUALS(LOCK_WAITING, lockerConflict.lockBegin(nullptr, resId, MODE_S));
|
||||
|
||||
// Sleep 1 millisecond so the wait time passes
|
||||
ASSERT_EQUALS(
|
||||
|
|
|
|||
|
|
@ -85,6 +85,7 @@ namespace {
|
|||
MONGO_FP_DECLARE(failAllInserts);
|
||||
MONGO_FP_DECLARE(failAllUpdates);
|
||||
MONGO_FP_DECLARE(failAllRemoves);
|
||||
MONGO_FP_DECLARE(hangDuringBatchInsert);
|
||||
|
||||
void updateRetryStats(OperationContext* opCtx, bool containsRetry) {
|
||||
if (containsRetry) {
|
||||
|
|
@ -368,7 +369,9 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
|
|||
boost::optional<AutoGetCollection> collection;
|
||||
auto acquireCollection = [&] {
|
||||
while (true) {
|
||||
opCtx->checkForInterrupt();
|
||||
if (MONGO_FAIL_POINT(hangDuringBatchInsert)) {
|
||||
MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangDuringBatchInsert);
|
||||
}
|
||||
|
||||
if (MONGO_FAIL_POINT(failAllInserts)) {
|
||||
uasserted(ErrorCodes::InternalError, "failAllInserts failpoint active!");
|
||||
|
|
@ -606,7 +609,6 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
|
|||
|
||||
boost::optional<AutoGetCollection> collection;
|
||||
while (true) {
|
||||
opCtx->checkForInterrupt();
|
||||
if (MONGO_FAIL_POINT(failAllUpdates)) {
|
||||
uasserted(ErrorCodes::InternalError, "failAllUpdates failpoint active!");
|
||||
}
|
||||
|
|
@ -762,8 +764,6 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
|
|||
ParsedDelete parsedDelete(opCtx, &request);
|
||||
uassertStatusOK(parsedDelete.parseRequest());
|
||||
|
||||
opCtx->checkForInterrupt();
|
||||
|
||||
if (MONGO_FAIL_POINT(failAllRemoves)) {
|
||||
uasserted(ErrorCodes::InternalError, "failAllRemoves failpoint active!");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -269,6 +269,9 @@ Status repairDatabase(OperationContext* opCtx,
|
|||
dbHolder().close(opCtx, dbName, "database closed for repair");
|
||||
ON_BLOCK_EXIT([&dbName, &opCtx] {
|
||||
try {
|
||||
// Ensure that we don't trigger an exception when attempting to take locks.
|
||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
||||
|
||||
// Open the db after everything finishes.
|
||||
auto db = dbHolder().openDb(opCtx, dbName);
|
||||
|
||||
|
|
|
|||
|
|
@ -538,6 +538,9 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) {
|
|||
}
|
||||
|
||||
if (_deleteNotifyExec) {
|
||||
// Don't allow an Interrupt exception to prevent _deleteNotifyExec from getting cleaned up.
|
||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
||||
|
||||
AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
|
||||
const auto cursorManager =
|
||||
autoColl.getCollection() ? autoColl.getCollection()->getCursorManager() : nullptr;
|
||||
|
|
|
|||
|
|
@ -1117,6 +1117,9 @@ DbResponse receivedGetMore(OperationContext* opCtx,
|
|||
getMore(opCtx, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized);
|
||||
} catch (AssertionException& e) {
|
||||
if (isCursorAuthorized) {
|
||||
// Make sure that killCursorGlobal does not throw an exception if it is interrupted.
|
||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
||||
|
||||
// If a cursor with id 'cursorid' was authorized, it may have been advanced
|
||||
// before an exception terminated processGetMore. Erase the ClientCursor
|
||||
// because it may now be out of sync with the client's iteration state.
|
||||
|
|
|
|||
Loading…
Reference in New Issue