mirror of https://github.com/mongodb/mongo
Revert "SERVER-27534 All writing operations must fail if the term changes."
This reverts commit bc19d43fdc.
This commit is contained in:
parent
3538f6e217
commit
ae50776bce
|
|
@ -11,8 +11,6 @@ selector:
|
||||||
exclude_files:
|
exclude_files:
|
||||||
# Skip any tests that run with auth explicitly.
|
# Skip any tests that run with auth explicitly.
|
||||||
- jstests/replsets/*[aA]uth*.js
|
- jstests/replsets/*[aA]uth*.js
|
||||||
# Also skip tests that require a ScopedThread, because ScopedThreads don't inherit credentials.
|
|
||||||
- jstests/replsets/interrupted_batch_insert.js
|
|
||||||
|
|
||||||
executor:
|
executor:
|
||||||
config:
|
config:
|
||||||
|
|
|
||||||
|
|
@ -1,126 +0,0 @@
|
||||||
// Tests the scenario described in SERVER-2753.
|
|
||||||
// 1. Send a single insert command with a large number of documents and the {ordered: true} option.
|
|
||||||
// 2. Force the thread processing the insert command to hang inbetween insert batches. (Inserts are
|
|
||||||
// typically split into batches of 64, and the server yields locks between batches.)
|
|
||||||
// 3. Disconnect the original primary from the network, forcing another node to step up.
|
|
||||||
// 4. Insert a single document on the new primary.
|
|
||||||
// 5. Return the original primary to the network and force it to step up by disconnecting the
|
|
||||||
// primary that replaced it. The original primary has to roll back any batches from step 1
|
|
||||||
// that were inserted locally but did not get majority committed before the insert in step 4.
|
|
||||||
// 6. Unpause the thread performing the insert from step 1. If it continues to
|
|
||||||
// insert batches even though there was a rollback, those inserts will
|
|
||||||
// violate the {ordered: true} option.
|
|
||||||
|
|
||||||
load('jstests/libs/parallelTester.js');
|
|
||||||
load("jstests/replsets/rslib.js");
|
|
||||||
|
|
||||||
(function() {
|
|
||||||
"use strict";
|
|
||||||
|
|
||||||
var name = "interrupted_batch_insert";
|
|
||||||
var replTest = new ReplSetTest({name: name, nodes: 3, useBridge: true});
|
|
||||||
var nodes = replTest.nodeList();
|
|
||||||
|
|
||||||
var conns = replTest.startSet();
|
|
||||||
replTest.initiate({
|
|
||||||
_id: name,
|
|
||||||
members: [
|
|
||||||
{_id: 0, host: nodes[0]},
|
|
||||||
{_id: 1, host: nodes[1]},
|
|
||||||
{_id: 2, host: nodes[2], priority: 0}
|
|
||||||
]
|
|
||||||
});
|
|
||||||
|
|
||||||
// The test starts with node 0 as the primary.
|
|
||||||
replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY);
|
|
||||||
var primary = replTest.nodes[0];
|
|
||||||
var collName = primary.getDB("db")[name].getFullName();
|
|
||||||
|
|
||||||
var getParameterResult =
|
|
||||||
primary.getDB("admin").runCommand({getParameter: 1, internalInsertMaxBatchSize: 1});
|
|
||||||
assert.commandWorked(getParameterResult);
|
|
||||||
const batchSize = getParameterResult.internalInsertMaxBatchSize;
|
|
||||||
|
|
||||||
// Prevent node 1 from getting any data from the node 0 oplog.
|
|
||||||
conns[0].disconnect(conns[1]);
|
|
||||||
|
|
||||||
// Allow the primary to insert the first 5 batches of documents. After that, the fail point
|
|
||||||
// activates, and the client thread hangs until the fail point gets turned off.
|
|
||||||
assert.commandWorked(primary.getDB("db").adminCommand(
|
|
||||||
{configureFailPoint: "hangDuringBatchInsert", mode: {skip: 5}}));
|
|
||||||
|
|
||||||
// In a background thread, issue an insert command to the primary that will insert 10 batches of
|
|
||||||
// documents.
|
|
||||||
var worker = new ScopedThread((host, collName, numToInsert) => {
|
|
||||||
// Insert elements [{idx: 0}, {idx: 1}, ..., {idx: numToInsert - 1}].
|
|
||||||
const docsToInsert = Array.from({length: numToInsert}, (_, i) => {
|
|
||||||
return {idx: i};
|
|
||||||
});
|
|
||||||
var coll = new Mongo(host).getCollection(collName);
|
|
||||||
assert.throws(
|
|
||||||
() => coll.insert(docsToInsert,
|
|
||||||
{writeConcern: {w: "majority", wtimeout: 5000}, ordered: true}),
|
|
||||||
[],
|
|
||||||
"network error");
|
|
||||||
}, primary.host, collName, 10 * batchSize);
|
|
||||||
worker.start();
|
|
||||||
|
|
||||||
// Wait long enough to guarantee that all 5 batches of inserts have executed and the primary is
|
|
||||||
// hung on the "hangDuringBatchInsert" fail point.
|
|
||||||
sleep(1000);
|
|
||||||
|
|
||||||
// Make sure the insert command is, in fact, running in the background.
|
|
||||||
assert.eq(primary.getDB("db").currentOp({"command.insert": name, active: true}).inprog.length,
|
|
||||||
1);
|
|
||||||
|
|
||||||
// Completely isolate the current primary (node 0), forcing it to step down.
|
|
||||||
conns[0].disconnect(conns[2]);
|
|
||||||
|
|
||||||
// Wait for node 1, the only other eligible node, to become the new primary.
|
|
||||||
replTest.waitForState(replTest.nodes[1], ReplSetTest.State.PRIMARY);
|
|
||||||
|
|
||||||
// Wait for node 2 to acknowledge node 1 as the new primary.
|
|
||||||
replTest.awaitSyncSource(replTest.nodes[2], replTest.nodes[1]);
|
|
||||||
|
|
||||||
// Issue a write to the new primary.
|
|
||||||
var collOnNewPrimary = replTest.nodes[1].getCollection(collName);
|
|
||||||
assert.writeOK(collOnNewPrimary.insert({singleDoc: 1}, {writeConcern: {w: "majority"}}));
|
|
||||||
|
|
||||||
// Isolate node 1, forcing it to step down as primary, and reconnect node 0, allowing it to step
|
|
||||||
// up again.
|
|
||||||
conns[0].reconnect(conns[2]);
|
|
||||||
conns[1].disconnect(conns[2]);
|
|
||||||
|
|
||||||
// Wait for node 0 to become primary again.
|
|
||||||
replTest.waitForState(primary, ReplSetTest.State.PRIMARY);
|
|
||||||
|
|
||||||
// Wait until node 2 recognizes node 0 as primary.
|
|
||||||
replTest.awaitSyncSource(replTest.nodes[2], primary);
|
|
||||||
|
|
||||||
// Allow the batch insert to continue.
|
|
||||||
assert.commandWorked(primary.getDB("db").adminCommand(
|
|
||||||
{configureFailPoint: "hangDuringBatchInsert", mode: "off"}));
|
|
||||||
|
|
||||||
// Wait until the insert command is done.
|
|
||||||
assert.soon(
|
|
||||||
() =>
|
|
||||||
primary.getDB("db").currentOp({"command.insert": name, active: true}).inprog.length ===
|
|
||||||
0);
|
|
||||||
|
|
||||||
worker.join();
|
|
||||||
|
|
||||||
var docs = primary.getDB("db")[name].find({idx: {$exists: 1}}).sort({idx: 1}).toArray();
|
|
||||||
|
|
||||||
// Any discontinuity in the "idx" values is an error. If an "idx" document failed to insert, all
|
|
||||||
// the of "idx" documents after it should also have failed to insert, because the insert
|
|
||||||
// specified {ordered: 1}. Note, if none of the inserts were successful, that's fine.
|
|
||||||
docs.forEach((element, index) => {
|
|
||||||
assert.eq(element.idx, index);
|
|
||||||
});
|
|
||||||
|
|
||||||
// Reconnect the remaining disconnected nodes, so we can exit.
|
|
||||||
conns[0].reconnect(conns[1]);
|
|
||||||
conns[1].reconnect(conns[2]);
|
|
||||||
|
|
||||||
replTest.stopSet(15);
|
|
||||||
}());
|
|
||||||
|
|
@ -140,7 +140,6 @@ void shutdown(ServiceContext* srvContext) {
|
||||||
// Close all open databases, shutdown storage engine and run all deinitializers.
|
// Close all open databases, shutdown storage engine and run all deinitializers.
|
||||||
auto shutdownOpCtx = serviceContext->makeOperationContext(client);
|
auto shutdownOpCtx = serviceContext->makeOperationContext(client);
|
||||||
{
|
{
|
||||||
UninterruptibleLockGuard noInterrupt(shutdownOpCtx->lockState());
|
|
||||||
Lock::GlobalLock lk(shutdownOpCtx.get(), MODE_X, Date_t::max());
|
Lock::GlobalLock lk(shutdownOpCtx.get(), MODE_X, Date_t::max());
|
||||||
dbHolder().closeAll(shutdownOpCtx.get(), "shutdown");
|
dbHolder().closeAll(shutdownOpCtx.get(), "shutdown");
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -332,9 +332,6 @@ Status renameCollectionCommon(OperationContext* opCtx,
|
||||||
|
|
||||||
// Dismissed on success
|
// Dismissed on success
|
||||||
auto tmpCollectionDropper = MakeGuard([&] {
|
auto tmpCollectionDropper = MakeGuard([&] {
|
||||||
// Ensure that we don't trigger an exception when attempting to take locks.
|
|
||||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
|
||||||
|
|
||||||
BSONObjBuilder unusedResult;
|
BSONObjBuilder unusedResult;
|
||||||
auto status =
|
auto status =
|
||||||
dropCollection(opCtx,
|
dropCollection(opCtx,
|
||||||
|
|
|
||||||
|
|
@ -294,9 +294,6 @@ public:
|
||||||
|
|
||||||
// Only used by the failpoints.
|
// Only used by the failpoints.
|
||||||
const auto dropAndReaquireReadLock = [&readLock, opCtx, &request]() {
|
const auto dropAndReaquireReadLock = [&readLock, opCtx, &request]() {
|
||||||
// Make sure an interrupted operation does not prevent us from reacquiring the lock.
|
|
||||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
|
||||||
|
|
||||||
readLock.reset();
|
readLock.reset();
|
||||||
readLock.emplace(opCtx, request.nss);
|
readLock.emplace(opCtx, request.nss);
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -1176,80 +1176,6 @@ TEST_F(DConcurrencyTestFixture, TicketReacquireCanBeInterrupted) {
|
||||||
ASSERT_THROWS_CODE(result.get(), AssertionException, ErrorCodes::Interrupted);
|
ASSERT_THROWS_CODE(result.get(), AssertionException, ErrorCodes::Interrupted);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DConcurrencyTestFixture, GlobalLockInInterruptedContextThrowsEvenWhenUncontested) {
|
|
||||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
|
||||||
auto opCtx = clients[0].second.get();
|
|
||||||
|
|
||||||
opCtx->markKilled();
|
|
||||||
|
|
||||||
boost::optional<Lock::GlobalRead> globalReadLock;
|
|
||||||
ASSERT_THROWS_CODE(
|
|
||||||
globalReadLock.emplace(opCtx, Date_t::now()), AssertionException, ErrorCodes::Interrupted);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(DConcurrencyTestFixture, GlobalLockInInterruptedContextThrowsEvenAcquiringRecursively) {
|
|
||||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
|
||||||
auto opCtx = clients[0].second.get();
|
|
||||||
|
|
||||||
Lock::GlobalWrite globalWriteLock(opCtx, Date_t::now());
|
|
||||||
|
|
||||||
opCtx->markKilled();
|
|
||||||
|
|
||||||
{
|
|
||||||
boost::optional<Lock::GlobalWrite> recursiveGlobalWriteLock;
|
|
||||||
ASSERT_THROWS_CODE(recursiveGlobalWriteLock.emplace(opCtx, Date_t::now()),
|
|
||||||
AssertionException,
|
|
||||||
ErrorCodes::Interrupted);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(DConcurrencyTestFixture, GlobalLockInInterruptedContextRespectsUninterruptibleGuard) {
|
|
||||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
|
||||||
auto opCtx = clients[0].second.get();
|
|
||||||
|
|
||||||
opCtx->markKilled();
|
|
||||||
|
|
||||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
|
||||||
Lock::GlobalRead globalReadLock(opCtx, Date_t::now()); // Does not throw.
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(DConcurrencyTestFixture, DBLockInInterruptedContextThrowsEvenWhenUncontested) {
|
|
||||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
|
||||||
auto opCtx = clients[0].second.get();
|
|
||||||
|
|
||||||
opCtx->markKilled();
|
|
||||||
|
|
||||||
boost::optional<Lock::DBLock> dbWriteLock;
|
|
||||||
ASSERT_THROWS_CODE(
|
|
||||||
dbWriteLock.emplace(opCtx, "db", MODE_IX), AssertionException, ErrorCodes::Interrupted);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(DConcurrencyTestFixture, DBLockInInterruptedContextThrowsEvenWhenAcquiringRecursively) {
|
|
||||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
|
||||||
auto opCtx = clients[0].second.get();
|
|
||||||
|
|
||||||
Lock::DBLock dbWriteLock(opCtx, "db", MODE_X);
|
|
||||||
|
|
||||||
opCtx->markKilled();
|
|
||||||
|
|
||||||
{
|
|
||||||
boost::optional<Lock::DBLock> recursiveDBWriteLock;
|
|
||||||
ASSERT_THROWS_CODE(recursiveDBWriteLock.emplace(opCtx, "db", MODE_X),
|
|
||||||
AssertionException,
|
|
||||||
ErrorCodes::Interrupted);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(DConcurrencyTestFixture, DBLockInInterruptedContextRespectsUninterruptibleGuard) {
|
|
||||||
auto clients = makeKClientsWithLockers<DefaultLockerImpl>(1);
|
|
||||||
auto opCtx = clients[0].second.get();
|
|
||||||
|
|
||||||
opCtx->markKilled();
|
|
||||||
|
|
||||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
|
||||||
Lock::DBLock dbWriteLock(opCtx, "db", MODE_X); // Does not throw.
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(DConcurrencyTestFixture, DBLockTimeout) {
|
TEST_F(DConcurrencyTestFixture, DBLockTimeout) {
|
||||||
auto clientOpctxPairs = makeKClientsWithLockers<DefaultLockerImpl>(2);
|
auto clientOpctxPairs = makeKClientsWithLockers<DefaultLockerImpl>(2);
|
||||||
auto opctx1 = clientOpctxPairs[0].second.get();
|
auto opctx1 = clientOpctxPairs[0].second.get();
|
||||||
|
|
|
||||||
|
|
@ -37,8 +37,8 @@ TEST(Deadlock, NoDeadlock) {
|
||||||
LockerForTests locker1(MODE_IS);
|
LockerForTests locker1(MODE_IS);
|
||||||
LockerForTests locker2(MODE_IS);
|
LockerForTests locker2(MODE_IS);
|
||||||
|
|
||||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(nullptr, resId, MODE_S));
|
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(resId, MODE_S));
|
||||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(nullptr, resId, MODE_S));
|
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(resId, MODE_S));
|
||||||
|
|
||||||
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
||||||
ASSERT(!wfg1.check().hasCycle());
|
ASSERT(!wfg1.check().hasCycle());
|
||||||
|
|
@ -54,14 +54,14 @@ TEST(Deadlock, Simple) {
|
||||||
LockerForTests locker1(MODE_IX);
|
LockerForTests locker1(MODE_IX);
|
||||||
LockerForTests locker2(MODE_IX);
|
LockerForTests locker2(MODE_IX);
|
||||||
|
|
||||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(nullptr, resIdA, MODE_X));
|
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(resIdA, MODE_X));
|
||||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(nullptr, resIdB, MODE_X));
|
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(resIdB, MODE_X));
|
||||||
|
|
||||||
// 1 -> 2
|
// 1 -> 2
|
||||||
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(nullptr, resIdB, MODE_X));
|
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(resIdB, MODE_X));
|
||||||
|
|
||||||
// 2 -> 1
|
// 2 -> 1
|
||||||
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(nullptr, resIdA, MODE_X));
|
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(resIdA, MODE_X));
|
||||||
|
|
||||||
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
||||||
ASSERT(wfg1.check().hasCycle());
|
ASSERT(wfg1.check().hasCycle());
|
||||||
|
|
@ -81,12 +81,12 @@ TEST(Deadlock, SimpleUpgrade) {
|
||||||
LockerForTests locker2(MODE_IX);
|
LockerForTests locker2(MODE_IX);
|
||||||
|
|
||||||
// Both acquire lock in intent mode
|
// Both acquire lock in intent mode
|
||||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(nullptr, resId, MODE_IX));
|
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(resId, MODE_IX));
|
||||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(nullptr, resId, MODE_IX));
|
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(resId, MODE_IX));
|
||||||
|
|
||||||
// Both try to upgrade
|
// Both try to upgrade
|
||||||
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(nullptr, resId, MODE_X));
|
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(resId, MODE_X));
|
||||||
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(nullptr, resId, MODE_X));
|
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(resId, MODE_X));
|
||||||
|
|
||||||
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
||||||
ASSERT(wfg1.check().hasCycle());
|
ASSERT(wfg1.check().hasCycle());
|
||||||
|
|
@ -107,17 +107,17 @@ TEST(Deadlock, Indirect) {
|
||||||
LockerForTests locker2(MODE_IX);
|
LockerForTests locker2(MODE_IX);
|
||||||
LockerForTests lockerIndirect(MODE_IX);
|
LockerForTests lockerIndirect(MODE_IX);
|
||||||
|
|
||||||
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(nullptr, resIdA, MODE_X));
|
ASSERT_EQUALS(LOCK_OK, locker1.lockBegin(resIdA, MODE_X));
|
||||||
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(nullptr, resIdB, MODE_X));
|
ASSERT_EQUALS(LOCK_OK, locker2.lockBegin(resIdB, MODE_X));
|
||||||
|
|
||||||
// 1 -> 2
|
// 1 -> 2
|
||||||
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(nullptr, resIdB, MODE_X));
|
ASSERT_EQUALS(LOCK_WAITING, locker1.lockBegin(resIdB, MODE_X));
|
||||||
|
|
||||||
// 2 -> 1
|
// 2 -> 1
|
||||||
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(nullptr, resIdA, MODE_X));
|
ASSERT_EQUALS(LOCK_WAITING, locker2.lockBegin(resIdA, MODE_X));
|
||||||
|
|
||||||
// 3 -> 2
|
// 3 -> 2
|
||||||
ASSERT_EQUALS(LOCK_WAITING, lockerIndirect.lockBegin(nullptr, resIdA, MODE_X));
|
ASSERT_EQUALS(LOCK_WAITING, lockerIndirect.lockBegin(resIdA, MODE_X));
|
||||||
|
|
||||||
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
DeadlockDetector wfg1(*getGlobalLockManager(), &locker1);
|
||||||
ASSERT(wfg1.check().hasCycle());
|
ASSERT(wfg1.check().hasCycle());
|
||||||
|
|
@ -143,17 +143,17 @@ TEST(Deadlock, IndirectWithUpgrade) {
|
||||||
LockerForTests writer(MODE_IX);
|
LockerForTests writer(MODE_IX);
|
||||||
|
|
||||||
// This sequence simulates the deadlock which occurs during flush
|
// This sequence simulates the deadlock which occurs during flush
|
||||||
ASSERT_EQUALS(LOCK_OK, writer.lockBegin(nullptr, resIdFlush, MODE_IX));
|
ASSERT_EQUALS(LOCK_OK, writer.lockBegin(resIdFlush, MODE_IX));
|
||||||
ASSERT_EQUALS(LOCK_OK, writer.lockBegin(nullptr, resIdDb, MODE_X));
|
ASSERT_EQUALS(LOCK_OK, writer.lockBegin(resIdDb, MODE_X));
|
||||||
|
|
||||||
ASSERT_EQUALS(LOCK_OK, reader.lockBegin(nullptr, resIdFlush, MODE_IS));
|
ASSERT_EQUALS(LOCK_OK, reader.lockBegin(resIdFlush, MODE_IS));
|
||||||
|
|
||||||
// R -> W
|
// R -> W
|
||||||
ASSERT_EQUALS(LOCK_WAITING, reader.lockBegin(nullptr, resIdDb, MODE_S));
|
ASSERT_EQUALS(LOCK_WAITING, reader.lockBegin(resIdDb, MODE_S));
|
||||||
|
|
||||||
// R -> W
|
// R -> W
|
||||||
// F -> W
|
// F -> W
|
||||||
ASSERT_EQUALS(LOCK_WAITING, flush.lockBegin(nullptr, resIdFlush, MODE_S));
|
ASSERT_EQUALS(LOCK_WAITING, flush.lockBegin(resIdFlush, MODE_S));
|
||||||
|
|
||||||
// W yields its flush lock, so now f is granted in mode S
|
// W yields its flush lock, so now f is granted in mode S
|
||||||
//
|
//
|
||||||
|
|
@ -164,14 +164,14 @@ TEST(Deadlock, IndirectWithUpgrade) {
|
||||||
//
|
//
|
||||||
// R -> W
|
// R -> W
|
||||||
// F -> R
|
// F -> R
|
||||||
ASSERT_EQUALS(LOCK_WAITING, flush.lockBegin(nullptr, resIdFlush, MODE_X));
|
ASSERT_EQUALS(LOCK_WAITING, flush.lockBegin(resIdFlush, MODE_X));
|
||||||
|
|
||||||
// W comes back from the commit and tries to re-acquire the flush lock
|
// W comes back from the commit and tries to re-acquire the flush lock
|
||||||
//
|
//
|
||||||
// R -> W
|
// R -> W
|
||||||
// F -> R
|
// F -> R
|
||||||
// W -> F
|
// W -> F
|
||||||
ASSERT_EQUALS(LOCK_WAITING, writer.lockBegin(nullptr, resIdFlush, MODE_IX));
|
ASSERT_EQUALS(LOCK_WAITING, writer.lockBegin(resIdFlush, MODE_IX));
|
||||||
|
|
||||||
// Run deadlock detection from the point of view of each of the involved lockers
|
// Run deadlock detection from the point of view of each of the involved lockers
|
||||||
DeadlockDetector wfgF(*getGlobalLockManager(), &flush);
|
DeadlockDetector wfgF(*getGlobalLockManager(), &flush);
|
||||||
|
|
|
||||||
|
|
@ -347,7 +347,7 @@ LockResult LockerImpl<IsForMMAPV1>::_lockGlobalBegin(OperationContext* opCtx,
|
||||||
}
|
}
|
||||||
_modeForTicket = mode;
|
_modeForTicket = mode;
|
||||||
}
|
}
|
||||||
const LockResult result = lockBegin(opCtx, resourceIdGlobal, mode);
|
const LockResult result = lockBegin(resourceIdGlobal, mode);
|
||||||
if (result == LOCK_OK)
|
if (result == LOCK_OK)
|
||||||
return LOCK_OK;
|
return LOCK_OK;
|
||||||
|
|
||||||
|
|
@ -469,7 +469,7 @@ template <bool IsForMMAPV1>
|
||||||
LockResult LockerImpl<IsForMMAPV1>::lock(
|
LockResult LockerImpl<IsForMMAPV1>::lock(
|
||||||
OperationContext* opCtx, ResourceId resId, LockMode mode, Date_t deadline, bool checkDeadlock) {
|
OperationContext* opCtx, ResourceId resId, LockMode mode, Date_t deadline, bool checkDeadlock) {
|
||||||
|
|
||||||
const LockResult result = lockBegin(opCtx, resId, mode);
|
const LockResult result = lockBegin(resId, mode);
|
||||||
|
|
||||||
// Fast, uncontended path
|
// Fast, uncontended path
|
||||||
if (result == LOCK_OK)
|
if (result == LOCK_OK)
|
||||||
|
|
@ -713,9 +713,7 @@ void LockerImpl<IsForMMAPV1>::restoreLockState(OperationContext* opCtx,
|
||||||
}
|
}
|
||||||
|
|
||||||
template <bool IsForMMAPV1>
|
template <bool IsForMMAPV1>
|
||||||
LockResult LockerImpl<IsForMMAPV1>::lockBegin(OperationContext* opCtx,
|
LockResult LockerImpl<IsForMMAPV1>::lockBegin(ResourceId resId, LockMode mode) {
|
||||||
ResourceId resId,
|
|
||||||
LockMode mode) {
|
|
||||||
dassert(!getWaitingResource().isValid());
|
dassert(!getWaitingResource().isValid());
|
||||||
|
|
||||||
LockRequest* request;
|
LockRequest* request;
|
||||||
|
|
@ -780,16 +778,6 @@ LockResult LockerImpl<IsForMMAPV1>::lockBegin(OperationContext* opCtx,
|
||||||
if (result == LOCK_WAITING) {
|
if (result == LOCK_WAITING) {
|
||||||
globalStats.recordWait(_id, resId, mode);
|
globalStats.recordWait(_id, resId, mode);
|
||||||
_stats.recordWait(resId, mode);
|
_stats.recordWait(resId, mode);
|
||||||
} else if (result == LOCK_OK && opCtx && _uninterruptibleLocksRequested == 0) {
|
|
||||||
// Lock acquisitions are not allowed to succeed when opCtx is marked as interrupted, unless
|
|
||||||
// the caller requested an uninterruptible lock.
|
|
||||||
auto interruptStatus = opCtx->checkForInterruptNoAssert();
|
|
||||||
if (!interruptStatus.isOK()) {
|
|
||||||
auto unlockIt = _requests.find(resId);
|
|
||||||
invariant(unlockIt);
|
|
||||||
_unlockImpl(&unlockIt);
|
|
||||||
uassertStatusOK(interruptStatus);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
|
|
||||||
|
|
@ -202,14 +202,10 @@ public:
|
||||||
* In other words for each call to lockBegin, which does not return LOCK_OK, there needs to
|
* In other words for each call to lockBegin, which does not return LOCK_OK, there needs to
|
||||||
* be a corresponding call to either lockComplete or unlock.
|
* be a corresponding call to either lockComplete or unlock.
|
||||||
*
|
*
|
||||||
* If an operation context is provided that represents an interrupted operation, lockBegin will
|
|
||||||
* throw an exception whenever it would have been possible to grant the lock with LOCK_OK. This
|
|
||||||
* behavior can be disabled with an UninterruptibleLockGuard.
|
|
||||||
*
|
|
||||||
* NOTE: These methods are not public and should only be used inside the class
|
* NOTE: These methods are not public and should only be used inside the class
|
||||||
* implementation and for unit-tests and not called directly.
|
* implementation and for unit-tests and not called directly.
|
||||||
*/
|
*/
|
||||||
LockResult lockBegin(OperationContext* opCtx, ResourceId resId, LockMode mode);
|
LockResult lockBegin(ResourceId resId, LockMode mode);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits for the completion of a lock, previously requested through lockBegin or
|
* Waits for the completion of a lock, previously requested through lockBegin or
|
||||||
|
|
|
||||||
|
|
@ -268,12 +268,12 @@ TEST(LockerImpl, CanceledDeadlockUnblocks) {
|
||||||
ASSERT(LOCK_OK == locker2.lock(db2, MODE_X));
|
ASSERT(LOCK_OK == locker2.lock(db2, MODE_X));
|
||||||
|
|
||||||
// Set up locker1 and locker2 for deadlock
|
// Set up locker1 and locker2 for deadlock
|
||||||
ASSERT(LOCK_WAITING == locker1.lockBegin(nullptr, db2, MODE_X));
|
ASSERT(LOCK_WAITING == locker1.lockBegin(db2, MODE_X));
|
||||||
ASSERT(LOCK_WAITING == locker2.lockBegin(nullptr, db1, MODE_X));
|
ASSERT(LOCK_WAITING == locker2.lockBegin(db1, MODE_X));
|
||||||
|
|
||||||
// Locker3 blocks behind locker 2
|
// Locker3 blocks behind locker 2
|
||||||
ASSERT(LOCK_OK == locker3.lockGlobal(MODE_IX));
|
ASSERT(LOCK_OK == locker3.lockGlobal(MODE_IX));
|
||||||
ASSERT(LOCK_WAITING == locker3.lockBegin(nullptr, db1, MODE_S));
|
ASSERT(LOCK_WAITING == locker3.lockBegin(db1, MODE_S));
|
||||||
|
|
||||||
// Detect deadlock, canceling our request
|
// Detect deadlock, canceling our request
|
||||||
ASSERT(
|
ASSERT(
|
||||||
|
|
@ -442,7 +442,7 @@ TEST(LockerImpl, GetLockerInfoShouldReportPendingLocks) {
|
||||||
DefaultLockerImpl conflictingLocker;
|
DefaultLockerImpl conflictingLocker;
|
||||||
ASSERT_EQ(LOCK_OK, conflictingLocker.lockGlobal(MODE_IS));
|
ASSERT_EQ(LOCK_OK, conflictingLocker.lockGlobal(MODE_IS));
|
||||||
ASSERT_EQ(LOCK_OK, conflictingLocker.lock(dbId, MODE_IS));
|
ASSERT_EQ(LOCK_OK, conflictingLocker.lock(dbId, MODE_IS));
|
||||||
ASSERT_EQ(LOCK_WAITING, conflictingLocker.lockBegin(nullptr, collectionId, MODE_IS));
|
ASSERT_EQ(LOCK_WAITING, conflictingLocker.lockBegin(collectionId, MODE_IS));
|
||||||
|
|
||||||
// Assert the held locks show up in the output of getLockerInfo().
|
// Assert the held locks show up in the output of getLockerInfo().
|
||||||
Locker::LockerInfo lockerInfo;
|
Locker::LockerInfo lockerInfo;
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,7 @@ TEST(LockStats, Wait) {
|
||||||
{
|
{
|
||||||
// This will block
|
// This will block
|
||||||
LockerForTests lockerConflict(MODE_IX);
|
LockerForTests lockerConflict(MODE_IX);
|
||||||
ASSERT_EQUALS(LOCK_WAITING, lockerConflict.lockBegin(nullptr, resId, MODE_S));
|
ASSERT_EQUALS(LOCK_WAITING, lockerConflict.lockBegin(resId, MODE_S));
|
||||||
|
|
||||||
// Sleep 1 millisecond so the wait time passes
|
// Sleep 1 millisecond so the wait time passes
|
||||||
ASSERT_EQUALS(
|
ASSERT_EQUALS(
|
||||||
|
|
|
||||||
|
|
@ -85,7 +85,6 @@ namespace {
|
||||||
MONGO_FP_DECLARE(failAllInserts);
|
MONGO_FP_DECLARE(failAllInserts);
|
||||||
MONGO_FP_DECLARE(failAllUpdates);
|
MONGO_FP_DECLARE(failAllUpdates);
|
||||||
MONGO_FP_DECLARE(failAllRemoves);
|
MONGO_FP_DECLARE(failAllRemoves);
|
||||||
MONGO_FP_DECLARE(hangDuringBatchInsert);
|
|
||||||
|
|
||||||
void updateRetryStats(OperationContext* opCtx, bool containsRetry) {
|
void updateRetryStats(OperationContext* opCtx, bool containsRetry) {
|
||||||
if (containsRetry) {
|
if (containsRetry) {
|
||||||
|
|
@ -383,9 +382,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
|
||||||
boost::optional<AutoGetCollection> collection;
|
boost::optional<AutoGetCollection> collection;
|
||||||
auto acquireCollection = [&] {
|
auto acquireCollection = [&] {
|
||||||
while (true) {
|
while (true) {
|
||||||
if (MONGO_FAIL_POINT(hangDuringBatchInsert)) {
|
opCtx->checkForInterrupt();
|
||||||
MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangDuringBatchInsert);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (MONGO_FAIL_POINT(failAllInserts)) {
|
if (MONGO_FAIL_POINT(failAllInserts)) {
|
||||||
uasserted(ErrorCodes::InternalError, "failAllInserts failpoint active!");
|
uasserted(ErrorCodes::InternalError, "failAllInserts failpoint active!");
|
||||||
|
|
@ -623,6 +620,7 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
|
||||||
|
|
||||||
boost::optional<AutoGetCollection> collection;
|
boost::optional<AutoGetCollection> collection;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
opCtx->checkForInterrupt();
|
||||||
if (MONGO_FAIL_POINT(failAllUpdates)) {
|
if (MONGO_FAIL_POINT(failAllUpdates)) {
|
||||||
uasserted(ErrorCodes::InternalError, "failAllUpdates failpoint active!");
|
uasserted(ErrorCodes::InternalError, "failAllUpdates failpoint active!");
|
||||||
}
|
}
|
||||||
|
|
@ -778,6 +776,8 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
|
||||||
ParsedDelete parsedDelete(opCtx, &request);
|
ParsedDelete parsedDelete(opCtx, &request);
|
||||||
uassertStatusOK(parsedDelete.parseRequest());
|
uassertStatusOK(parsedDelete.parseRequest());
|
||||||
|
|
||||||
|
opCtx->checkForInterrupt();
|
||||||
|
|
||||||
if (MONGO_FAIL_POINT(failAllRemoves)) {
|
if (MONGO_FAIL_POINT(failAllRemoves)) {
|
||||||
uasserted(ErrorCodes::InternalError, "failAllRemoves failpoint active!");
|
uasserted(ErrorCodes::InternalError, "failAllRemoves failpoint active!");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -269,9 +269,6 @@ Status repairDatabase(OperationContext* opCtx,
|
||||||
dbHolder().close(opCtx, dbName, "database closed for repair");
|
dbHolder().close(opCtx, dbName, "database closed for repair");
|
||||||
ON_BLOCK_EXIT([&dbName, &opCtx] {
|
ON_BLOCK_EXIT([&dbName, &opCtx] {
|
||||||
try {
|
try {
|
||||||
// Ensure that we don't trigger an exception when attempting to take locks.
|
|
||||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
|
||||||
|
|
||||||
// Open the db after everything finishes.
|
// Open the db after everything finishes.
|
||||||
auto db = dbHolder().openDb(opCtx, dbName);
|
auto db = dbHolder().openDb(opCtx, dbName);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -538,9 +538,6 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_deleteNotifyExec) {
|
if (_deleteNotifyExec) {
|
||||||
// Don't allow an Interrupt exception to prevent _deleteNotifyExec from getting cleaned up.
|
|
||||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
|
||||||
|
|
||||||
AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
|
AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
|
||||||
const auto cursorManager =
|
const auto cursorManager =
|
||||||
autoColl.getCollection() ? autoColl.getCollection()->getCursorManager() : nullptr;
|
autoColl.getCollection() ? autoColl.getCollection()->getCursorManager() : nullptr;
|
||||||
|
|
|
||||||
|
|
@ -1143,9 +1143,6 @@ DbResponse receivedGetMore(OperationContext* opCtx,
|
||||||
getMore(opCtx, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized);
|
getMore(opCtx, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized);
|
||||||
} catch (AssertionException& e) {
|
} catch (AssertionException& e) {
|
||||||
if (isCursorAuthorized) {
|
if (isCursorAuthorized) {
|
||||||
// Make sure that killCursorGlobal does not throw an exception if it is interrupted.
|
|
||||||
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
|
|
||||||
|
|
||||||
// If a cursor with id 'cursorid' was authorized, it may have been advanced
|
// If a cursor with id 'cursorid' was authorized, it may have been advanced
|
||||||
// before an exception terminated processGetMore. Erase the ClientCursor
|
// before an exception terminated processGetMore. Erase the ClientCursor
|
||||||
// because it may now be out of sync with the client's iteration state.
|
// because it may now be out of sync with the client's iteration state.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue