mongo/jstests/sharding/retryable_writes.js

693 lines
24 KiB
JavaScript

/**
* Test basic retryable write without errors by checking that the resulting collection after the
* retry is as expected and it does not create additional oplog entries.
*/
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
Random.setRandomSeed();
function dropSessionsCollection(mainConn, priConn, configConn) {
jsTest.log("Dropping sessions collection.");
if (mainConn == priConn) {
// ReplicaSet, just drop the collection on the primary.
priConn.getDB("config").getCollection("system.sessions").drop();
} else {
// Sharded cluster, drop the collection everywhere. Since the collection is sharded we need
// to clean up not only the local catalog (as above on replica sets) but also the metadata
// in the global catalog.
// TODO (SERVER-107023): replace this once we have the ability to drop the sessions
// collection internally.
let collDoc = configConn.getDB("config").getCollection("collections").findOne({
_id: "config.system.sessions",
});
if (collDoc == undefined) {
return;
}
let uuid = collDoc.uuid;
assert.commandWorked(
configConn
.getDB("config")
.getCollection("collections")
.deleteOne({_id: "config.system.sessions"}, {writeConcern: {w: "majority"}}),
);
assert.commandWorked(
configConn
.getDB("config")
.getCollection("chunks")
.deleteMany({uuid: uuid}, {writeConcern: {w: "majority"}}),
);
// We update the placement history here to satisfy the hook which validates this information
// though in practices these inconsistencies do not matter since you cannot open change
// streams on config.system.sessions.
let existingPlacementDoc = configConn
.getDB("config")
.getCollection("placementHistory")
.find()
.sort({timestamp: -1})
.limit(1)[0];
let newPlacement = {
_id: ObjectId(),
"nss": "config.system.sessions",
"uuid": uuid,
"timestamp": Timestamp(
existingPlacementDoc.timestamp.getTime(),
existingPlacementDoc.timestamp.getInc() + 1,
),
"shards": [],
};
assert.commandWorked(
configConn
.getDB("config")
.getCollection("placementHistory")
.insertOne(newPlacement, {
writeConcern: {w: "majority"},
}),
);
configConn.getDB("config").getCollection("system.sessions").drop();
priConn.getDB("config").getCollection("system.sessions").drop();
assert.commandWorked(configConn.adminCommand({_flushRoutingTableCacheUpdates: "config.system.sessions"}));
assert.commandWorked(priConn.adminCommand({_flushRoutingTableCacheUpdates: "config.system.sessions"}));
}
}
function createSessionsCollection(configConn) {
jsTest.log("Forcing refresh of the sessions collection");
assert.commandWorked(configConn.adminCommand({refreshLogicalSessionCacheNow: 1}));
}
function checkFindAndModifyResult(expected, toCheck) {
assert.eq(expected.ok, toCheck.ok);
assert.eq(expected.value, toCheck.value);
assert.docEq(expected.lastErrorObject, toCheck.lastErrorObject);
}
function verifyServerStatusFields(serverStatusResponse) {
assert(
serverStatusResponse.hasOwnProperty("transactions"),
"Expected the serverStatus response to have a 'transactions' field",
);
assert.hasFields(
serverStatusResponse.transactions,
["retriedCommandsCount", "retriedStatementsCount", "transactionsCollectionWriteCount"],
"The 'transactions' field in serverStatus did not have all of the expected fields",
);
}
function verifyServerStatusChanges(initialStats, newStats, newCommands, newStatements, newCollectionWrites) {
assert.eq(
initialStats.retriedCommandsCount + newCommands,
newStats.retriedCommandsCount,
"expected retriedCommandsCount to increase by " + newCommands,
);
assert.eq(
initialStats.retriedStatementsCount + newStatements,
newStats.retriedStatementsCount,
"expected retriedStatementsCount to increase by " + newStatements,
);
assert.eq(
initialStats.transactionsCollectionWriteCount + newCollectionWrites,
newStats.transactionsCollectionWriteCount,
"expected retriedCommandsCount to increase by " + newCollectionWrites,
);
}
function handleSessionsCollection(mainConn, priConn, configConn) {
// We have to either drop and recreate or do neither because in config shard suites otherwise
// the periodic creation will mess with the transaction.
let createdCollection = false;
if (Math.random() < 0.25) {
dropSessionsCollection(mainConn, priConn, configConn);
createSessionsCollection(configConn);
createdCollection = true;
}
// If we are in a config shard and we dropped and recreated the sessions collection then, as
// part of the create coordinator, we ran a transaction which will mess with server status
// counts.
return TestData.configShard && mainConn != priConn && createdCollection;
}
function runTests(mainConn, priConn, configConn) {
let lsid = UUID();
assert.commandWorked(mainConn.getDB("test").createCollection("user"));
////////////////////////////////////////////////////////////////////////
// Test insert command
let initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);
let cmd = {
insert: "user",
documents: [{_id: 10}, {_id: 30}],
ordered: false,
lsid: {id: lsid},
txnNumber: NumberLong(34),
};
let testDBMain = mainConn.getDB("test");
let result = assert.commandWorked(testDBMain.runCommand(cmd));
let oplog = priConn.getDB("local").oplog.rs;
let insertOplogEntries = oplog.find({ns: "test.user", op: "i"}).itcount();
let testDBPri = priConn.getDB("test");
assert.eq(2, testDBPri.user.find().itcount());
let createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
let retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
assert.eq(result.ok, retryResult.ok);
assert.eq(result.n, retryResult.n);
assert.eq(result.writeErrors, retryResult.writeErrors);
assert.eq(result.writeConcernErrors, retryResult.writeConcernErrors);
assert.eq(2, testDBPri.user.find().itcount());
assert.eq(insertOplogEntries, oplog.find({ns: "test.user", op: "i"}).itcount());
let newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(
initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
2 /* newStatements */,
createdCollectionInTxn ? 2 : 1 /* newCollectionWrites */,
);
////////////////////////////////////////////////////////////////////////
// Test update command
initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);
cmd = {
update: "user",
updates: [
{q: {_id: 10}, u: {$inc: {x: 1}}}, // in place
{q: {_id: 20}, u: {$inc: {y: 1}}, upsert: true},
{q: {_id: 30}, u: {z: 1}}, // replacement
],
ordered: false,
lsid: {id: lsid},
txnNumber: NumberLong(35),
};
result = assert.commandWorked(testDBMain.runCommand(cmd));
let updateOplogEntries = oplog.find({ns: "test.user", op: "u"}).itcount();
// Upserts are stored as inserts in the oplog, so check inserts too.
insertOplogEntries = oplog.find({ns: "test.user", op: "i"}).itcount();
assert.eq(3, testDBPri.user.find().itcount());
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
assert.eq(result.ok, retryResult.ok);
assert.eq(result.n, retryResult.n);
assert.eq(result.nModified, retryResult.nModified);
assert.eq(result.upserted, retryResult.upserted);
assert.eq(result.writeErrors, retryResult.writeErrors);
assert.eq(result.writeConcernErrors, retryResult.writeConcernErrors);
assert.eq(3, testDBPri.user.find().itcount());
assert.eq({_id: 10, x: 1}, testDBPri.user.findOne({_id: 10}));
assert.eq({_id: 20, y: 1}, testDBPri.user.findOne({_id: 20}));
assert.eq({_id: 30, z: 1}, testDBPri.user.findOne({_id: 30}));
assert.eq(updateOplogEntries, oplog.find({ns: "test.user", op: "u"}).itcount());
assert.eq(insertOplogEntries, oplog.find({ns: "test.user", op: "i"}).itcount());
newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(
initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
3 /* newStatements */,
createdCollectionInTxn ? 4 : 3 /* newCollectionWrites */,
);
////////////////////////////////////////////////////////////////////////
// Test delete command
initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);
assert.commandWorked(testDBMain.user.insert({_id: 40, x: 1}));
assert.commandWorked(testDBMain.user.insert({_id: 50, y: 1}));
assert.eq(2, testDBPri.user.find({x: 1}).itcount());
assert.eq(2, testDBPri.user.find({y: 1}).itcount());
cmd = {
delete: "user",
deletes: [
{q: {x: 1}, limit: 1},
{q: {y: 1}, limit: 1},
],
ordered: false,
lsid: {id: lsid},
txnNumber: NumberLong(36),
};
result = assert.commandWorked(mainConn.getDB("test").runCommand(cmd));
let deleteOplogEntries = oplog.find({ns: "test.user", op: "d"}).itcount();
assert.eq(1, testDBPri.user.find({x: 1}).itcount());
assert.eq(1, testDBPri.user.find({y: 1}).itcount());
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
assert.eq(result.ok, retryResult.ok);
assert.eq(result.n, retryResult.n);
assert.eq(result.writeErrors, retryResult.writeErrors);
assert.eq(result.writeConcernErrors, retryResult.writeConcernErrors);
assert.eq(1, testDBPri.user.find({x: 1}).itcount());
assert.eq(1, testDBPri.user.find({y: 1}).itcount());
assert.eq(deleteOplogEntries, oplog.find({ns: "test.user", op: "d"}).itcount());
newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(
initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
2 /* newStatements */,
createdCollectionInTxn ? 3 : 2 /* newCollectionWrites */,
);
////////////////////////////////////////////////////////////////////////
// Test findAndModify command (upsert)
initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);
cmd = {
findAndModify: "user",
query: {_id: 60},
update: {$inc: {x: 1}},
new: true,
upsert: true,
lsid: {id: lsid},
txnNumber: NumberLong(37),
};
result = assert.commandWorked(mainConn.getDB("test").runCommand(cmd));
insertOplogEntries = oplog.find({ns: "test.user", op: "i"}).itcount();
updateOplogEntries = oplog.find({ns: "test.user", op: "u"}).itcount();
assert.eq({_id: 60, x: 1}, testDBPri.user.findOne({_id: 60}));
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
assert.eq({_id: 60, x: 1}, testDBPri.user.findOne({_id: 60}));
assert.eq(insertOplogEntries, oplog.find({ns: "test.user", op: "i"}).itcount());
assert.eq(updateOplogEntries, oplog.find({ns: "test.user", op: "u"}).itcount());
checkFindAndModifyResult(result, retryResult);
newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(
initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
1 /* newStatements */,
createdCollectionInTxn ? 2 : 1 /* newCollectionWrites */,
);
////////////////////////////////////////////////////////////////////////
// Test findAndModify command (update, return pre-image)
initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);
cmd = {
findAndModify: "user",
query: {_id: 60},
update: {$inc: {x: 1}},
new: false,
upsert: false,
lsid: {id: lsid},
txnNumber: NumberLong(38),
};
result = assert.commandWorked(mainConn.getDB("test").runCommand(cmd));
let oplogEntries = oplog.find({ns: "test.user", op: "u"}).itcount();
assert.eq({_id: 60, x: 2}, testDBPri.user.findOne({_id: 60}));
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
assert.eq({_id: 60, x: 2}, testDBPri.user.findOne({_id: 60}));
assert.eq(oplogEntries, oplog.find({ns: "test.user", op: "u"}).itcount());
checkFindAndModifyResult(result, retryResult);
newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(
initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
1 /* newStatements */,
createdCollectionInTxn ? 2 : 1 /* newCollectionWrites */,
);
////////////////////////////////////////////////////////////////////////
// Test findAndModify command (update, return post-image)
initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);
cmd = {
findAndModify: "user",
query: {_id: 60},
update: {$inc: {x: 1}},
new: true,
upsert: false,
lsid: {id: lsid},
txnNumber: NumberLong(39),
};
result = assert.commandWorked(mainConn.getDB("test").runCommand(cmd));
oplogEntries = oplog.find({ns: "test.user", op: "u"}).itcount();
assert.eq({_id: 60, x: 3}, testDBPri.user.findOne({_id: 60}));
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
assert.eq({_id: 60, x: 3}, testDBPri.user.findOne({_id: 60}));
assert.eq(oplogEntries, oplog.find({ns: "test.user", op: "u"}).itcount());
checkFindAndModifyResult(result, retryResult);
newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(
initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
1 /* newStatements */,
createdCollectionInTxn ? 2 : 1 /* newCollectionWrites */,
);
////////////////////////////////////////////////////////////////////////
// Test findAndModify command (remove, return pre-image)
initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);
assert.commandWorked(testDBMain.user.insert({_id: 70, f: 1}));
assert.commandWorked(testDBMain.user.insert({_id: 80, f: 1}));
cmd = {
findAndModify: "user",
query: {f: 1},
remove: true,
lsid: {id: lsid},
txnNumber: NumberLong(40),
};
result = assert.commandWorked(mainConn.getDB("test").runCommand(cmd));
oplogEntries = oplog.find({ns: "test.user", op: "d"}).itcount();
let docCount = testDBPri.user.find().itcount();
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
assert.eq(oplogEntries, oplog.find({ns: "test.user", op: "d"}).itcount());
assert.eq(docCount, testDBPri.user.find().itcount());
checkFindAndModifyResult(result, retryResult);
newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(
initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
1 /* newStatements */,
createdCollectionInTxn ? 2 : 1 /* newCollectionWrites */,
);
}
function runFailpointTests(mainConn, priConn) {
// Test the 'onPrimaryTransactionalWrite' failpoint
let lsid = UUID();
let testDb = mainConn.getDB("TestDB");
assert.commandWorked(testDb.createCollection("user"));
// Test connection close (default behaviour). The connection will get closed, but the
// inserts must succeed
assert.commandWorked(priConn.adminCommand({configureFailPoint: "onPrimaryTransactionalWrite", mode: "alwaysOn"}));
try {
// Set skipRetryOnNetworkError so the shell doesn't automatically retry, since the
// command has a txnNumber.
TestData.skipRetryOnNetworkError = true;
let res = assert.commandWorked(
testDb.runCommand({
insert: "user",
documents: [{x: 0}, {x: 1}],
ordered: true,
lsid: {id: lsid},
txnNumber: NumberLong(1),
}),
);
// Mongos will automatically retry on retryable errors if the request has a txnNumber,
// and the retry path for already completed writes does not trigger the failpoint, so
// the command will succeed when run through mongos.
assert.eq(2, res.n);
assert.eq(false, res.hasOwnProperty("writeErrors"));
} catch (e) {
let exceptionMsg = e.toString();
assert(isNetworkError(e), "Incorrect exception thrown: " + exceptionMsg);
} finally {
TestData.skipRetryOnNetworkError = false;
}
let collCount = 0;
assert.soon(
() => {
collCount = testDb.user.find({}).itcount();
return collCount == 2;
},
"testDb.user returned " + collCount + " entries",
);
// Test exception throw. One update must succeed and the other must fail.
assert.commandWorked(
priConn.adminCommand({
configureFailPoint: "onPrimaryTransactionalWrite",
mode: {skip: 1},
data: {closeConnection: false, failBeforeCommitExceptionCode: ErrorCodes.InternalError},
}),
);
let cmd = {
update: "user",
updates: [
{q: {x: 0}, u: {$inc: {y: 1}}},
{q: {x: 1}, u: {$inc: {y: 1}}},
],
ordered: true,
lsid: {id: lsid},
txnNumber: NumberLong(2),
};
let writeResult = testDb.runCommand(cmd);
assert.eq(1, writeResult.nModified);
assert.eq(1, writeResult.writeErrors.length);
assert.eq(1, writeResult.writeErrors[0].index);
assert.eq(ErrorCodes.InternalError, writeResult.writeErrors[0].code);
assert.commandWorked(priConn.adminCommand({configureFailPoint: "onPrimaryTransactionalWrite", mode: "off"}));
writeResult = testDb.runCommand(cmd);
assert.eq(2, writeResult.nModified);
let collContents = testDb.user.find({}).sort({x: 1}).toArray();
assert.eq(2, collContents.length);
assert.eq(0, collContents[0].x);
assert.eq(1, collContents[0].y);
assert.eq(1, collContents[1].x);
assert.eq(1, collContents[1].y);
}
function runRetryableWriteErrorTest(mainConn) {
// Test TransactionTooOld error message on retryable writes
const lsid = UUID();
const testDb = mainConn.getDB("TestDB");
assert.commandWorked(
testDb.runCommand({
insert: "user",
documents: [{x: 1}],
ordered: true,
lsid: {id: lsid},
txnNumber: NumberLong(2),
}),
);
const writeResult = testDb.runCommand({
update: "user",
updates: [{q: {x: 1}, u: {$inc: {x: 1}}}],
ordered: true,
lsid: {id: lsid},
txnNumber: NumberLong(1),
});
assert.commandFailedWithCode(writeResult, ErrorCodes.TransactionTooOld);
assert(writeResult.errmsg.includes("Retryable write with txnNumber 1 is prohibited"), writeResult);
}
function runMultiTests(mainConn) {
// Test the behavior of retryable writes with multi=true / limit=0
let lsid = {id: UUID()};
let testDb = mainConn.getDB("test_multi");
// Only the update statements with multi=true in a batch fail.
let cmd = {
update: "user",
updates: [
{q: {x: 1}, u: {y: 1}},
{q: {x: 2}, u: {z: 1}, multi: true},
],
ordered: true,
lsid: lsid,
txnNumber: NumberLong(1),
};
let res = assert.commandWorkedIgnoringWriteErrors(testDb.runCommand(cmd));
assert.eq(1, res.writeErrors.length, "expected only one write error, received: " + tojson(res.writeErrors));
assert.eq(
1,
res.writeErrors[0].index,
"expected the update at index 1 to fail, not the update at index: " + res.writeErrors[0].index,
);
assert.eq(
ErrorCodes.InvalidOptions,
res.writeErrors[0].code,
"expected to fail with code " + ErrorCodes.InvalidOptions + ", received: " + res.writeErrors[0].code,
);
// Only the delete statements with limit=0 in a batch fail.
cmd = {
delete: "user",
deletes: [
{q: {x: 1}, limit: 1},
{q: {y: 1}, limit: 0},
],
ordered: false,
lsid: lsid,
txnNumber: NumberLong(1),
};
res = assert.commandWorkedIgnoringWriteErrors(testDb.runCommand(cmd));
assert.eq(1, res.writeErrors.length, "expected only one write error, received: " + tojson(res.writeErrors));
assert.eq(
1,
res.writeErrors[0].index,
"expected the delete at index 1 to fail, not the delete at index: " + res.writeErrors[0].index,
);
assert.eq(
ErrorCodes.InvalidOptions,
res.writeErrors[0].code,
"expected to fail with code " + ErrorCodes.InvalidOptions + ", received: " + res.writeErrors[0].code,
);
}
function runInvalidTests(mainConn) {
let lsid = {id: UUID()};
let localDB = mainConn.getDB("local");
let cmd = {
insert: "user",
documents: [{_id: 10}, {_id: 30}],
ordered: false,
lsid: lsid,
txnNumber: NumberLong(10),
};
let res = assert.commandWorkedIgnoringWriteErrors(localDB.runCommand(cmd));
assert.eq(2, res.writeErrors.length);
localDB.user.insert({_id: 10, x: 1});
localDB.user.insert({_id: 30, z: 2});
cmd = {
update: "user",
updates: [
{q: {_id: 10}, u: {$inc: {x: 1}}}, // in place
{q: {_id: 20}, u: {$inc: {y: 1}}, upsert: true},
{q: {_id: 30}, u: {z: 1}}, // replacement
],
ordered: false,
lsid: lsid,
txnNumber: NumberLong(11),
};
res = assert.commandWorkedIgnoringWriteErrors(localDB.runCommand(cmd));
assert.eq(3, res.writeErrors.length);
cmd = {
delete: "user",
deletes: [
{q: {x: 1}, limit: 1},
{q: {z: 2}, limit: 1},
],
ordered: false,
lsid: lsid,
txnNumber: NumberLong(12),
};
res = assert.commandWorkedIgnoringWriteErrors(localDB.runCommand(cmd));
assert.eq(2, res.writeErrors.length);
cmd = {
findAndModify: "user",
query: {_id: 60},
update: {$inc: {x: 1}},
new: true,
upsert: true,
lsid: {id: lsid},
txnNumber: NumberLong(37),
};
assert.commandFailed(localDB.runCommand(cmd));
}
// Tests for replica set
let replTest = new ReplSetTest({nodes: 2});
replTest.startSet({verbose: 5});
replTest.initiate();
let priConn = replTest.getPrimary();
runTests(priConn, priConn, priConn);
runFailpointTests(priConn, priConn);
runRetryableWriteErrorTest(priConn);
runMultiTests(priConn);
runInvalidTests(priConn);
replTest.stopSet();
// Tests for sharded cluster
let st = new ShardingTest({shards: {rs0: {nodes: 1, verbose: 5}}});
runTests(st.s0, st.rs0.getPrimary(), st.configRS.getPrimary());
runFailpointTests(st.s0, st.rs0.getPrimary());
runMultiTests(st.s0);
st.stop();