mirror of https://github.com/mongodb/mongo
revert "SERVER-60823: Restructure transaction override retries [7.0.20-hotfix]" (#35440)
GitOrigin-RevId: cf29fc744f8ee2ac9245f2845f29c6a706dc375a
This commit is contained in:
parent
c2c4cb72aa
commit
38a70214a5
|
|
@ -1,6 +1,7 @@
|
|||
// this tests 1% of all points
|
||||
//
|
||||
// @tags: [
|
||||
// # TODO(SERVER-60823): remove incompatible_with_gcov
|
||||
// incompatible_with_gcov,
|
||||
// ]
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@
|
|||
// All V2 2dsphere indices are sparse in the geo fields.
|
||||
//
|
||||
// @tags: [
|
||||
// # TODO(SERVER-60823): remove incompatible_with_gcov
|
||||
// incompatible_with_gcov,
|
||||
// ]
|
||||
|
||||
(function() {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
/**
|
||||
* Test the $sampleRate match expression.
|
||||
* @tags: [
|
||||
* # TODO(SERVER-60823): remove incompatible_with_gcov
|
||||
* incompatible_with_gcov,
|
||||
* ]
|
||||
*/
|
||||
(function() {
|
||||
|
|
|
|||
|
|
@ -24,14 +24,6 @@
|
|||
* jstests/libs/txns/txn_passthrough_runner_selftest.js
|
||||
*/
|
||||
|
||||
import {
|
||||
hasError,
|
||||
hasWriteConcernError,
|
||||
isSuccess,
|
||||
Result,
|
||||
RetryTracker
|
||||
} from "jstests/libs/override_methods/retry_utils.js";
|
||||
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
|
|
@ -269,13 +261,18 @@ function isFailedToSatisfyPrimaryReadPreferenceError(res) {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Tracks if the current command is being run as part of a transaction retry.
|
||||
let inTransactionRetry = false;
|
||||
|
||||
function isTransactionRetry() {
|
||||
return inTransactionRetry;
|
||||
function hasError(res) {
|
||||
return res.ok !== 1 || res.writeErrors;
|
||||
}
|
||||
|
||||
function hasWriteConcernError(res) {
|
||||
return res.hasOwnProperty("writeConcernError");
|
||||
}
|
||||
|
||||
// Tracks if the current command is being run in a network retry. This is specifically for
|
||||
// retries that this file initiates, not ones that retryable writes initiates.
|
||||
let inCommandNetworkErrorRetry = false;
|
||||
|
||||
// "Command ID" is an identifier for a given command being overridden. This is to track what log
|
||||
// messages come from what commands. This override is highly recursive and this is helpful for
|
||||
// debugging that recursion and following what commands initiated other commands.
|
||||
|
|
@ -330,8 +327,7 @@ function logMsgFull(msgHeader, msgFooter) {
|
|||
// Validate the command before running it, to prevent tests with non-retryable commands
|
||||
// from being run.
|
||||
function validateCmdNetworkErrorCompatibility(cmdName, cmdObj) {
|
||||
assert(!isCmdInTransaction(cmdObj));
|
||||
assert(!isTransactionRetry());
|
||||
assert(!inCommandNetworkErrorRetry);
|
||||
assert(!isNested());
|
||||
|
||||
const isRetryableWriteCmd = RetryableWritesUtil.isRetryableWriteCmdName(cmdName);
|
||||
|
|
@ -529,24 +525,18 @@ function commitTransaction(conn, lsid, txnNumber) {
|
|||
logMsgFull('commitTransaction',
|
||||
`Committing transaction ${txnNumber} on session ${tojsononeline(lsid)}`);
|
||||
|
||||
const cmdObj = {
|
||||
// Running the command on conn will reenter from the top of `runCommandOverride`, retrying
|
||||
// as needed.
|
||||
assert.commandWorked(conn.adminCommand({
|
||||
commitTransaction: 1,
|
||||
autocommit: false,
|
||||
lsid: lsid,
|
||||
txnNumber: txnNumber,
|
||||
};
|
||||
// Append this override-generated commit to the transaction state.
|
||||
// The transaction is being ended, but the commit may need to be retried
|
||||
// if it fails.
|
||||
continueTransaction("admin", "commitTransaction", cmdObj);
|
||||
// Running the command on conn will reenter from the top of `runCommandOverride`, retrying
|
||||
// as needed.
|
||||
const res = assert.commandWorked(conn.adminCommand(cmdObj));
|
||||
}));
|
||||
|
||||
// We've successfully committed the transaction, so we can forget the ops we've successfully
|
||||
// run.
|
||||
clearOpsList();
|
||||
return res;
|
||||
}
|
||||
|
||||
function abortTransaction(conn, lsid, txnNumber) {
|
||||
|
|
@ -603,7 +593,7 @@ function calculateStmtIdInc(cmdName, cmdObj) {
|
|||
}
|
||||
}
|
||||
|
||||
function continueTransaction(dbName, cmdName, cmdObj) {
|
||||
function continueTransaction(conn, dbName, cmdName, cmdObj) {
|
||||
cmdObj.txnNumber = txnOptions.txnNumber;
|
||||
cmdObj.stmtId = txnOptions.stmtId;
|
||||
cmdObj.autocommit = false;
|
||||
|
|
@ -616,7 +606,7 @@ function continueTransaction(dbName, cmdName, cmdObj) {
|
|||
assert(!cmdObj.hasOwnProperty('writeConcern'), cmdObj);
|
||||
|
||||
// If this is the first time we are running this command, push it to the ops array.
|
||||
if (!isNested()) {
|
||||
if (!isNested() && !inCommandNetworkErrorRetry) {
|
||||
// Make a copy so the command does not get changed by the test.
|
||||
const objCopy = TransactionsUtil.deepCopyObject({}, cmdObj);
|
||||
|
||||
|
|
@ -658,16 +648,13 @@ function setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid) {
|
|||
const driverSession = conn.getDB(dbName).getSession();
|
||||
const commandSupportsTransaction = TransactionsUtil.commandSupportsTxn(dbName, cmdName, cmdObj);
|
||||
const isSystemDotProfile = isNamespaceSystemDotProfile(cmdObj);
|
||||
const isNonTxnGetMore = isCommandNonTxnGetMore(cmdName, cmdObj);
|
||||
|
||||
const includeInTransaction = commandSupportsTransaction && !isSystemDotProfile &&
|
||||
driverSession.getSessionId() !== null && !isNonTxnGetMore;
|
||||
|
||||
if (includeInTransaction) {
|
||||
if (commandSupportsTransaction && !isSystemDotProfile &&
|
||||
driverSession.getSessionId() !== null && !isCommandNonTxnGetMore(cmdName, cmdObj)) {
|
||||
if (isNested()) {
|
||||
// Nested commands should never start a new transaction.
|
||||
} else if (ops.length === 0) {
|
||||
// We should never start a transaction on a getMore.
|
||||
// We should never end a transaction on a getMore.
|
||||
assert.neq(cmdName, "getMore", cmdObj);
|
||||
startNewTransaction(conn, cmdObj);
|
||||
} else if (cmdName === "getMore") {
|
||||
|
|
@ -679,7 +666,8 @@ function setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid) {
|
|||
commitTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
startNewTransaction(conn, cmdObj);
|
||||
}
|
||||
continueTransaction(dbName, cmdName, cmdObj);
|
||||
continueTransaction(conn, dbName, cmdName, cmdObj);
|
||||
|
||||
} else {
|
||||
if (ops.length > 0 && !isNested() && !isSystemDotProfile) {
|
||||
// Operations on system.profile must be allowed to execute in parallel with open
|
||||
|
|
@ -693,7 +681,101 @@ function setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid) {
|
|||
}
|
||||
}
|
||||
appendReadAndWriteConcern(conn, dbName, cmdName, cmdObj);
|
||||
return includeInTransaction;
|
||||
}
|
||||
|
||||
// Retries the entire transaction without committing it. Returns immediately on an error with
|
||||
// the response from the failed command. This may recursively retry the entire transaction in
|
||||
// which case parent retries are completed early.
|
||||
function retryEntireTransaction(conn, lsid) {
|
||||
// Re-run every command in the ops array.
|
||||
assert.gt(ops.length, 0);
|
||||
|
||||
// Keep track of what txnNumber this retry is attempting.
|
||||
const retriedTxnNumber = startNewTransaction(conn, {"ignored object": 1});
|
||||
|
||||
logMsgFull('Retrying entire transaction',
|
||||
`txnNumber: ${retriedTxnNumber}, lsid: ${tojsononeline(lsid)}`);
|
||||
let res;
|
||||
for (let op of ops) {
|
||||
logMsgFull('Retrying op',
|
||||
`txnNumber: ${retriedTxnNumber}, lsid: ${tojsononeline(lsid)},` +
|
||||
` db: ${op.dbName}, op: ${tojsononeline(op.cmdObj)}`);
|
||||
// Running the command on conn will reenter from the top of `runCommandOverride`,
|
||||
// individual statement retries will be suppressed by tracking nesting level.
|
||||
res = conn.getDB(op.dbName).runCommand(op.cmdObj);
|
||||
|
||||
if (hasError(res) || hasWriteConcernError(res)) {
|
||||
return res;
|
||||
}
|
||||
// Sanity check that we checked for an error correctly.
|
||||
assert.commandWorked(res);
|
||||
|
||||
// If we recursively retried the entire transaction, we do not want to continue this
|
||||
// retry. We just pass up the response from the retry that completed.
|
||||
if (txnOptions.txnNumber !== retriedTxnNumber) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
// We do not commit the transaction and let it continue in the next operation.
|
||||
return res;
|
||||
}
|
||||
|
||||
// Creates the given collection, retrying if needed. Throws on failure.
|
||||
function createCollectionExplicitly(conn, dbName, collName, lsid) {
|
||||
logMsgFull(
|
||||
'create',
|
||||
`Explicitly creating collection ${dbName}.${collName} and then retrying transaction`);
|
||||
|
||||
// Always majority commit the create because this is not expected to roll back once
|
||||
// successful.
|
||||
const createCmdObj = {
|
||||
create: collName,
|
||||
lsid: lsid,
|
||||
writeConcern: {w: 'majority'},
|
||||
};
|
||||
|
||||
// Running the command on conn will reenter from the top of `runCommandOverride`, retrying
|
||||
// as needed. If an error returned by `create` were tolerable, it would already have been
|
||||
// retried by the time it surfaced here.
|
||||
assert.commandWorked(conn.getDB(dbName).runCommand(createCmdObj));
|
||||
}
|
||||
|
||||
// Processes the response to the command if we are configured for txn override. Performs retries
|
||||
// if necessary for implicit collection creation or transient transaction errors.
|
||||
// Returns the last command response received by a command or retry.
|
||||
function retryWithTxnOverride(res, conn, dbName, cmdName, cmdObj, lsid, logError) {
|
||||
assert(configuredForTxnOverride());
|
||||
|
||||
const failedOnCRUDStatement =
|
||||
hasError(res) && !isCommitOrAbort(cmdName) && isCmdInTransaction(cmdObj);
|
||||
if (failedOnCRUDStatement) {
|
||||
assert.gt(ops.length, 0);
|
||||
abortTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
|
||||
// Transaction statements cannot be retried, but retryable codes are expected to succeed
|
||||
// on full transaction retry.
|
||||
if (configuredForNetworkRetry() && RetryableWritesUtil.isRetryableCode(res.code)) {
|
||||
logError("Retrying on retryable error for transaction statement");
|
||||
return retryEntireTransaction(conn, lsid);
|
||||
}
|
||||
}
|
||||
|
||||
// Transient transaction errors should retry the entire transaction. A
|
||||
// TransientTransactionError on "abortTransaction" is considered a success.
|
||||
if (TransactionsUtil.isTransientTransactionError(res) && cmdName !== "abortTransaction") {
|
||||
logError("Retrying on TransientTransactionError response");
|
||||
res = retryEntireTransaction(conn, lsid);
|
||||
|
||||
// If we got a TransientTransactionError on 'commitTransaction' retrying the transaction
|
||||
// will not retry it, so we retry it here.
|
||||
if (!hasError(res) && cmdName === "commitTransaction") {
|
||||
commitTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
// Returns true if any error code in a response's "raw" field is retryable.
|
||||
|
|
@ -857,6 +939,32 @@ function shouldRetryForBackgroundReconfigOverride(res, cmdName, logError) {
|
|||
return res;
|
||||
}
|
||||
|
||||
// Processes exceptions if configured for txn override. Retries the entire transaction on
|
||||
// transient transaction errors or network errors if configured for network errors as well.
|
||||
// If a retry fails, returns the response, or returns null for further exception processing.
|
||||
function retryWithTxnOverrideException(e, conn, cmdName, cmdObj, lsid, logError) {
|
||||
assert(configuredForTxnOverride());
|
||||
|
||||
if (TransactionsUtil.isTransientTransactionError(e) && cmdName !== "abortTransaction") {
|
||||
logError("Retrying on TransientTransactionError exception for command");
|
||||
const res = retryEntireTransaction(conn, lsid);
|
||||
|
||||
// If we got a TransientTransactionError on 'commitTransaction' retrying the transaction
|
||||
// will not retry it, so we retry it here.
|
||||
if (!hasError(res) && cmdName === "commitTransaction") {
|
||||
commitTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
if (configuredForNetworkRetry() && isNetworkError(e) &&
|
||||
!canRetryNetworkErrorForCommand(cmdName, cmdObj)) {
|
||||
logError("Retrying on network exception for transaction statement");
|
||||
return retryEntireTransaction(conn, lsid);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// Processes exceptions if configured for network error retry. Returns whether to subtract one
|
||||
// from the number of command retries this override counts. Throws if we should not retry.
|
||||
function shouldRetryWithNetworkExceptionOverride(
|
||||
|
|
@ -895,36 +1003,48 @@ function shouldRetryWithNetworkExceptionOverride(
|
|||
return true;
|
||||
}
|
||||
|
||||
const kMaxNumNetworkErrorRetries = 3;
|
||||
const kMaxNumRetries = 3;
|
||||
|
||||
// This function is the heart of the override with the main error retry loop.
|
||||
function networkRetryOverrideBody(conn, cmdName, cmdObj, clientFunction, makeFuncArgs) {
|
||||
function runCommandOverrideBody(conn, dbName, cmdName, cmdObj, lsid, clientFunction, makeFuncArgs) {
|
||||
const startTime = Date.now();
|
||||
|
||||
const isTxnStatement = isCmdInTransaction(cmdObj);
|
||||
|
||||
if (configuredForNetworkRetry() && !isTxnStatement) {
|
||||
if (configuredForNetworkRetry() && !isNested() && !isTxnStatement) {
|
||||
// If this is a top level command, make sure that the command supports network error
|
||||
// retries. Don't validate transaction statements because their encompassing transaction
|
||||
// can be retried at a higher level, even if each statement isn't retryable on its own.
|
||||
validateCmdNetworkErrorCompatibility(cmdName, cmdObj);
|
||||
}
|
||||
|
||||
if (configuredForTxnOverride()) {
|
||||
setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid);
|
||||
}
|
||||
|
||||
const canRetryNetworkError = canRetryNetworkErrorForCommand(cmdName, cmdObj);
|
||||
const canRetryReadError = canRetryReadErrorDuringBackgroundReconfig(cmdName);
|
||||
let numNetworkErrorRetriesRemaining = canRetryNetworkError ? kMaxNumNetworkErrorRetries : 0;
|
||||
let numNetworkErrorRetries = canRetryNetworkError ? kMaxNumRetries : 0;
|
||||
do {
|
||||
try {
|
||||
// Actually run the provided command.
|
||||
let res = clientFunction.apply(conn, makeFuncArgs(cmdObj));
|
||||
if (configuredForTxnOverride()) {
|
||||
logMsgFull("Override got response",
|
||||
`res: ${tojsononeline(res)}, cmd: ${tojsononeline(cmdObj)}`);
|
||||
|
||||
// There's no error, no retries need to be attempted.
|
||||
if (isSuccess(res)) {
|
||||
return res;
|
||||
if (!hasError(res) &&
|
||||
TransactionsUtil.commandIsNonTxnAggregation(cmdName, cmdObj)) {
|
||||
nonTxnAggCursorSet[res.cursor.id] = true;
|
||||
}
|
||||
}
|
||||
|
||||
const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, res);
|
||||
|
||||
if (configuredForTxnOverride()) {
|
||||
res = retryWithTxnOverride(res, conn, dbName, cmdName, cmdObj, lsid, logError);
|
||||
}
|
||||
|
||||
if (canRetryNetworkError) {
|
||||
const networkRetryRes =
|
||||
shouldRetryWithNetworkErrorOverride(res, cmdName, startTime, logError);
|
||||
|
|
@ -950,263 +1070,50 @@ function networkRetryOverrideBody(conn, cmdName, cmdObj, clientFunction, makeFun
|
|||
} catch (e) {
|
||||
const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, e);
|
||||
|
||||
if (configuredForTxnOverride()) {
|
||||
const txnRetryOnException =
|
||||
retryWithTxnOverrideException(e, conn, cmdName, cmdObj, lsid, logError);
|
||||
if (txnRetryOnException) {
|
||||
return txnRetryOnException;
|
||||
}
|
||||
}
|
||||
|
||||
if (canRetryNetworkError) {
|
||||
const decrementRetryCount = shouldRetryWithNetworkExceptionOverride(
|
||||
e, cmdName, cmdObj, startTime, numNetworkErrorRetriesRemaining, logError);
|
||||
e, cmdName, cmdObj, startTime, numNetworkErrorRetries, logError);
|
||||
if (decrementRetryCount) {
|
||||
--numNetworkErrorRetriesRemaining;
|
||||
--numNetworkErrorRetries;
|
||||
logMsgFull("Decrementing command network error retry count",
|
||||
`New count: ${numNetworkErrorRetriesRemaining}`);
|
||||
`New count: ${numNetworkErrorRetries}`);
|
||||
}
|
||||
|
||||
logErrorFull("Retrying on network error for command", cmdName, cmdObj, e);
|
||||
inCommandNetworkErrorRetry = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
throw e;
|
||||
}
|
||||
} while (numNetworkErrorRetriesRemaining >= 0);
|
||||
} while (numNetworkErrorRetries >= 0);
|
||||
throw new Error("MONGO UNREACHABLE");
|
||||
}
|
||||
|
||||
function networkRunCommandOverride(conn, dbName, cmdName, cmdObj, clientFunction, makeFuncArgs) {
|
||||
if (!configuredForNetworkRetry() && !configuredForBackgroundReconfigs()) {
|
||||
return clientFunction.apply(conn, makeFuncArgs(cmdObj));
|
||||
}
|
||||
return networkRetryOverrideBody(conn, cmdName, cmdObj, clientFunction, makeFuncArgs);
|
||||
}
|
||||
|
||||
function shouldRetryTxn(cmdName, cmdObj, result) {
|
||||
try {
|
||||
return shouldRetryTxnOnStatus(cmdName, cmdObj, result.unwrap());
|
||||
} catch (e) {
|
||||
return shouldRetryTxnOnException(cmdName, cmdObj, e);
|
||||
}
|
||||
}
|
||||
|
||||
function shouldRetryTxnOnStatus(cmdName, cmdObj, res) {
|
||||
if (isSuccess(res)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (TransactionsUtil.isConflictingOperationInProgress(res)) {
|
||||
// Other overrides, or session.js, may interfere, and retry an op which starts
|
||||
// a transaction if it reported a failure e.g., on a network error, but succeeded
|
||||
// server-side. Retry the transaction _again_, with a new txnNumber.
|
||||
return true;
|
||||
}
|
||||
|
||||
const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, res);
|
||||
|
||||
// Transient transaction errors should retry the entire transaction. A
|
||||
// TransientTransactionError on "abortTransaction" is considered a success.
|
||||
if (TransactionsUtil.isTransientTransactionError(res) && cmdName !== "abortTransaction") {
|
||||
logError("Retrying on TransientTransactionError response");
|
||||
return true;
|
||||
}
|
||||
|
||||
const failedOnCRUDStatement = !isCommitOrAbort(cmdName);
|
||||
if (failedOnCRUDStatement) {
|
||||
// If configured for BOTH txn override, and network error override, a network error
|
||||
// will NOT retry a single op, instead it must retry the entire transaction.
|
||||
if (configuredForNetworkRetry() && RetryableWritesUtil.isRetryableCode(res.code)) {
|
||||
logError("Retrying on retryable error for transaction statement");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function shouldRetryTxnOnException(cmdName, cmdObj, exception) {
|
||||
const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, exception);
|
||||
if (TransactionsUtil.isTransientTransactionError(exception) && cmdName !== "abortTransaction") {
|
||||
logError("Retrying on TransientTransactionError exception for command");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (configuredForNetworkRetry() && isNetworkError(exception) &&
|
||||
!canRetryNetworkErrorForCommand(cmdName, cmdObj)) {
|
||||
// If configured for BOTH txn override, and network error override, a network error
|
||||
// will NOT retry a single op, instead it must retry the entire transaction.
|
||||
logError("Retrying on network exception for transaction statement");
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// Maximum number of times a transaction can be retried within a given op.
|
||||
// Note that a transaction may fail and be retried up to this many times on
|
||||
// each call to runCommand, for a total number of retries on the order of
|
||||
// maxOpsInTransaction * kMaxNumTxnErrorRetries.
|
||||
const kMaxNumTxnErrorRetries = 5;
|
||||
const kMaxTxnRetryTimeout = 5 * 60 * 1000;
|
||||
|
||||
function txnRetryOverrideBody(conn, dbName, cmdName, cmdObj, lsid, clientFunction, makeFuncArgs) {
|
||||
assert(isCmdInTransaction(cmdObj));
|
||||
const retryTracker = new RetryTracker(kMaxNumTxnErrorRetries, kMaxTxnRetryTimeout);
|
||||
|
||||
const logResult = (res) => {
|
||||
res.apply(value =>
|
||||
logMsgFull("Override got response",
|
||||
`res: ${tojsononeline(value)}, cmd: ${tojsononeline(cmdObj)}`));
|
||||
};
|
||||
|
||||
// ==== Initial Attempt ====
|
||||
let res = Result.wrap(() => clientFunction.apply(conn, makeFuncArgs(cmdObj)));
|
||||
logResult(res);
|
||||
if (res.ok()) {
|
||||
return res.status;
|
||||
}
|
||||
|
||||
if (!shouldRetryTxn(cmdName, cmdObj, res)) {
|
||||
return res.unwrap();
|
||||
}
|
||||
|
||||
if (!isCommitOrAbort(cmdName)) {
|
||||
// Abort the transaction before trying again in a new transaction.
|
||||
try {
|
||||
// Abort returns in successful cases, or throws - if it fails,
|
||||
// we will still retry the transaction anyway.
|
||||
abortTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
} catch {
|
||||
}
|
||||
}
|
||||
|
||||
// On failure of a single request within a transaction, retry the entire transaction.
|
||||
// This involves re-playing all preceding requests in the transaction.
|
||||
// Note that commits/aborts may be individually retried by networkRunCommandOverride,
|
||||
// but any other failed op will retry the whole transaction here.
|
||||
|
||||
// ==== Retry Loop ====
|
||||
for (let retry of retryTracker) {
|
||||
// Track the new transaction state.
|
||||
const retriedTxnNumber = startNewTransaction(conn, {"ignored object": 1});
|
||||
|
||||
logMsgFull('Retrying entire transaction',
|
||||
`txnNumber: ${retriedTxnNumber}, lsid: ${
|
||||
tojsononeline(lsid)}, remainingAttempts: ${retry.remaining}`);
|
||||
|
||||
for (let op of ops) {
|
||||
logMsgFull('Retrying op',
|
||||
`txnNumber: ${retriedTxnNumber}, lsid: ${tojsononeline(lsid)},` +
|
||||
` db: ${op.dbName}, op: ${tojsononeline(op.cmdObj)}`);
|
||||
|
||||
// Running the command on conn will reenter from the top of `runCommandOverride`,
|
||||
// this will re-enter `txnRunCommandOverride` but such ops bypass transaction
|
||||
// retry logic, to avoid recursive retries.
|
||||
cmdObj = {...op.cmdObj, txnNumber: retriedTxnNumber};
|
||||
cmdName = Object.keys(cmdObj)[0];
|
||||
appendReadAndWriteConcern(conn, op.dbName, cmdName, cmdObj);
|
||||
res = Result.wrap(() => conn.getDB(op.dbName).runCommand(cmdObj));
|
||||
logResult(res);
|
||||
|
||||
if (!res.ok()) {
|
||||
// Failed while replaying operations for the transaction.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (res.ok()) {
|
||||
// Replayed the entire transaction successfully.
|
||||
break;
|
||||
}
|
||||
|
||||
if (!shouldRetryTxn(cmdName, cmdObj, res)) {
|
||||
break;
|
||||
}
|
||||
if (!isCommitOrAbort(cmdName)) {
|
||||
// Abort the transaction before trying again in a new transaction.
|
||||
// Abort returns in successful cases, or throws, re-starting the retry.
|
||||
// In any case, the only way to progress is to retry again with a new
|
||||
// transaction number.
|
||||
try {
|
||||
abortTransaction(conn, lsid, txnOptions.txnNumber);
|
||||
} catch (e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
return res.unwrap();
|
||||
}
|
||||
|
||||
// Top level runCommand override function.
|
||||
function txnRunCommandOverride(conn, dbName, cmdName, cmdObj, clientFunction, makeFuncArgs) {
|
||||
currentCommandID.push(newestCommandID);
|
||||
newestCommandID++;
|
||||
function runCommandOverride(conn, dbName, cmdName, cmdObj, clientFunction, makeFuncArgs) {
|
||||
currentCommandID.push(newestCommandID++);
|
||||
nestingLevel++;
|
||||
|
||||
const lsid = cmdObj.lsid;
|
||||
const passthrough = () => {
|
||||
try {
|
||||
// This request isn't eligible for transaction level retries,
|
||||
// so pass it down to the next override directly.
|
||||
const res = clientFunction.apply(conn, makeFuncArgs(cmdObj));
|
||||
|
||||
logMsgFull(
|
||||
"Txn override passthrough got response",
|
||||
`res: ${tojsononeline(res)}, cmdName: ${cmdName} cmd: ${tojsononeline(cmdObj)}`);
|
||||
// Record non-transaction agg cursor IDs so subsequent getMores for this cursor
|
||||
// can also avoid being forced into an ongoing transaction.
|
||||
if (isSuccess(res) && TransactionsUtil.commandIsNonTxnAggregation(cmdName, cmdObj)) {
|
||||
nonTxnAggCursorSet[res.cursor.id] = true;
|
||||
}
|
||||
return res;
|
||||
} catch (e) {
|
||||
logErrorFull("Txn override passthrough got exception", cmdName, cmdObj, e);
|
||||
throw e;
|
||||
}
|
||||
};
|
||||
try {
|
||||
if (!configuredForTxnOverride()) {
|
||||
// Not currently enabled - but tests can change this at runtime, so it must be
|
||||
// checked for each operation.
|
||||
logMsgFull(
|
||||
"Txn override not enabled",
|
||||
`Will not apply override to cmdName: ${cmdName} cmd: ${tojsononeline(cmdObj)}`);
|
||||
return passthrough();
|
||||
}
|
||||
|
||||
// Record this operation, starting a new transaction if not currently in one.
|
||||
// If this command is not eligible for inclusion in the current transaction,
|
||||
// it will commit the current transaction here.
|
||||
if (!setupTransactionCommand(conn, dbName, cmdName, cmdObj, cmdObj.lsid)) {
|
||||
// This is a command which does not support executing in a transaction,
|
||||
// or a getMore for a cursor created by such a command.
|
||||
logMsgFull(
|
||||
"Operation cannot be wrapped in a transaction",
|
||||
`Will not apply override to cmdName: ${cmdName} cmd: ${tojsononeline(cmdObj)}`);
|
||||
return passthrough();
|
||||
}
|
||||
|
||||
// A nested call means we have re-entered runCommand from _within_ an ongoing call to
|
||||
// txnRunCommandOverride. This is either for retries, or "injecting" a commit to close
|
||||
// a transaction (reached max ops, need to run a non-transaction command).
|
||||
if (isTransactionRetry()) {
|
||||
logMsgFull(
|
||||
"Operation is a retry attempt from txn override",
|
||||
`Will not apply override to cmdName: ${cmdName} cmd: ${tojsononeline(cmdObj)}`);
|
||||
return passthrough();
|
||||
}
|
||||
|
||||
let res;
|
||||
try {
|
||||
assert(!inTransactionRetry);
|
||||
inTransactionRetry = true;
|
||||
|
||||
// Enter the override body, where retries will be handled at the transaction level.
|
||||
res = txnRetryOverrideBody(
|
||||
conn, dbName, cmdName, cmdObj, lsid, clientFunction, makeFuncArgs);
|
||||
} finally {
|
||||
inTransactionRetry = false;
|
||||
}
|
||||
const res = runCommandOverrideBody(
|
||||
conn, dbName, cmdName, cmdObj, lsid, clientFunction, makeFuncArgs);
|
||||
|
||||
// Many tests run queries that are expected to fail. In this case, when we wrap CRUD ops
|
||||
// in transactions, the transaction including the failed query will not be able to
|
||||
// commit. This override expects transactions to be able to commit. Rather than
|
||||
// denylisting all tests containing queries that are expected to fail, we clear the ops
|
||||
// list when we return an error to the test so we do not retry the failed query.
|
||||
if (hasError(res) && (ops.length > 0)) {
|
||||
if (configuredForTxnOverride() && !isNested() && hasError(res) && (ops.length > 0)) {
|
||||
logMsgFull("Clearing ops on failed command",
|
||||
`res: ${tojsononeline(res)}, cmd: ${tojsononeline(cmdObj)}`);
|
||||
clearOpsList();
|
||||
|
|
@ -1218,6 +1125,7 @@ function txnRunCommandOverride(conn, dbName, cmdName, cmdObj, clientFunction, ma
|
|||
// Reset recursion and retry state tracking.
|
||||
nestingLevel--;
|
||||
currentCommandID.pop();
|
||||
inCommandNetworkErrorRetry = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1265,6 +1173,5 @@ if (configuredForTxnOverride()) {
|
|||
};
|
||||
}
|
||||
|
||||
OverrideHelpers.overrideRunCommand(networkRunCommandOverride);
|
||||
OverrideHelpers.overrideRunCommand(txnRunCommandOverride);
|
||||
OverrideHelpers.overrideRunCommand(runCommandOverride);
|
||||
})();
|
||||
|
|
|
|||
|
|
@ -1,106 +0,0 @@
|
|||
/**
|
||||
* Miscellaneous utilities which are commonly useful when dealing
|
||||
* with success/failure, and retries.
|
||||
*/
|
||||
|
||||
export function hasError(res) {
|
||||
return res.ok !== 1 || res.writeErrors || (res.hasOwnProperty("nErrors") && res.nErrors != 0);
|
||||
}
|
||||
|
||||
export function hasWriteConcernError(res) {
|
||||
return res.hasOwnProperty("writeConcernError");
|
||||
}
|
||||
|
||||
export function isSuccess(res) {
|
||||
return !(hasError(res) || hasWriteConcernError(res));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for a status or an exception.
|
||||
*
|
||||
* Typical usage:
|
||||
*
|
||||
* let res = Result.wrap(() => mightReturnOrThrow());
|
||||
* if (res.ok()) {...}
|
||||
* return res.unwrap(); // Returns or throws.
|
||||
*/
|
||||
export class Result {
|
||||
constructor(status, exception) {
|
||||
// Status or exception must be present.
|
||||
assert((status != null) != (exception != null));
|
||||
this.status = status;
|
||||
this.exception = exception;
|
||||
}
|
||||
static status(status) {
|
||||
let res = new Result(status, null);
|
||||
return res;
|
||||
}
|
||||
|
||||
static exception(exception) {
|
||||
let res = new Result(null, exception);
|
||||
return res;
|
||||
}
|
||||
|
||||
static wrap(callable) {
|
||||
try {
|
||||
return Result.status(callable());
|
||||
} catch (e) {
|
||||
return Result.exception(e);
|
||||
}
|
||||
}
|
||||
|
||||
ok() {
|
||||
return this.status != null && isSuccess(this.status);
|
||||
}
|
||||
|
||||
unwrap() {
|
||||
if (this.exception != null) {
|
||||
throw this.exception;
|
||||
}
|
||||
return this.status;
|
||||
}
|
||||
apply(callback) {
|
||||
if (this.exception != null) {
|
||||
return callback(this.exception);
|
||||
}
|
||||
return callback(this.status);
|
||||
}
|
||||
dispatch(onStatus, onException) {
|
||||
if (this.exception != null) {
|
||||
return onException(this.exception);
|
||||
}
|
||||
return onStatus(this.status);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility to simplify looping until X iterations or Y milliseconds elapses;
|
||||
* a common pattern when retrying operations.
|
||||
*
|
||||
* let helper = new RetryTracker(3, 3000);
|
||||
* for (let _ of helper) {
|
||||
* ... // E.g., try a command. Will loop until break or exceeding either limit.
|
||||
* }
|
||||
* The timer starts from creation of the helper.
|
||||
*/
|
||||
export class RetryTracker {
|
||||
constructor(retryCount, timeout) {
|
||||
this.retryCount = retryCount;
|
||||
this.timeout = timeout;
|
||||
this.startTime = Date.now();
|
||||
}
|
||||
|
||||
* [Symbol.iterator]() {
|
||||
if (this.retryCount <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (let i = 0; i < this.retryCount; ++i) {
|
||||
let elapsed = Date.now() - this.startTime;
|
||||
if (elapsed > this.timeout) {
|
||||
break;
|
||||
}
|
||||
yield {remaining: this.retryCount - i - 1, remainingTime: this.timeout - elapsed};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -31,8 +31,7 @@ var TransactionsUtil = (function() {
|
|||
// still execute concurrently with other transactions. Pipelines with $changeStream or $out
|
||||
// cannot run within a transaction.
|
||||
function commandIsNonTxnAggregation(cmdName, cmdObj) {
|
||||
return !("explain" in cmdObj) &&
|
||||
OverrideHelpers.isAggregationWithOutOrMergeStage(cmdName, cmdObj) ||
|
||||
return OverrideHelpers.isAggregationWithOutOrMergeStage(cmdName, cmdObj) ||
|
||||
OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj);
|
||||
}
|
||||
|
||||
|
|
@ -108,11 +107,6 @@ var TransactionsUtil = (function() {
|
|||
res.errorLabels.includes('TransientTransactionError');
|
||||
}
|
||||
|
||||
function isConflictingOperationInProgress(res) {
|
||||
return res != null && res.hasOwnProperty('codeName') &&
|
||||
res.codeName === "ConflictingOperationInProgress";
|
||||
}
|
||||
|
||||
// Runs a function 'func()' in a transaction on database 'db'. Invokes function
|
||||
// 'beforeTransactionFunc()' before the transaction (can be used to get references to
|
||||
// collections etc.). Ensures that the transaction is successfully committed, by retrying the
|
||||
|
|
@ -147,7 +141,6 @@ var TransactionsUtil = (function() {
|
|||
commandTypeCanSupportTxn,
|
||||
deepCopyObject,
|
||||
isTransientTransactionError,
|
||||
isConflictingOperationInProgress,
|
||||
runInTransaction,
|
||||
};
|
||||
})();
|
||||
|
|
|
|||
Loading…
Reference in New Issue