mongo/jstests/libs/bulk_write_utils.js

702 lines
26 KiB
JavaScript

/**
* Helper function to check a BulkWrite cursorEntry.
*/
export const cursorEntryValidator = function (entry, expectedEntry) {
const assertMsg = " value did not match for bulkWrite reply item. actual reply: " + tojson(entry);
assert.eq(entry.ok, expectedEntry.ok, "'ok'" + assertMsg);
assert.eq(entry.idx, expectedEntry.idx, "'idx'" + assertMsg);
assert.eq(entry.n, expectedEntry.n, "'n'" + assertMsg);
assert.eq(entry.nModified, expectedEntry.nModified, "'nModified' " + assertMsg);
assert.eq(entry.code, expectedEntry.code, "'code'" + assertMsg);
assert.docEq(entry.upserted, expectedEntry.upserted, "'upserted' " + assertMsg);
};
export const cursorSizeValidator = function (response, expectedSize) {
assert.eq(
response.cursor.firstBatch.length,
expectedSize,
"Expected cursor size did not match response cursor size. Response: " + tojson(response),
);
};
export const summaryFieldsValidator = function (response, fields) {
const assertMsg = " value did not match for bulkWrite summary fields. actual reply: " + tojson(response);
assert.eq(response.nErrors, fields.nErrors, "nErrors" + assertMsg);
assert.eq(response.nInserted, fields.nInserted, "nInserted" + assertMsg);
assert.eq(response.nDeleted, fields.nDeleted, "nDeleted" + assertMsg);
assert.eq(response.nMatched, fields.nMatched, "nMatched" + assertMsg);
assert.eq(response.nModified, fields.nModified, "nModified" + assertMsg);
assert.eq(response.nUpserted, fields.nUpserted, "nUpserted" + assertMsg);
};
// assert.eq(before + diff, after, errorMessage) but with more information on failure.
const checkEqual = function (before, expectedDiff, after, errorMessage) {
assert.eq(before + expectedDiff, after, `${errorMessage}: ${before} + ${expectedDiff} != ${after}`);
};
// Helper class for the bulkwrite_metrics tests.
export class BulkWriteMetricChecker {
constructor(
testDB,
namespaces,
bulkWrite,
isMongos,
fle,
errorsOnly,
retryCount = 3,
timeseries = false,
defaultTimestamp = undefined,
) {
this.testDB = testDB;
this.namespaces = namespaces;
this.bulkWrite = bulkWrite;
this.isMongos = isMongos;
this.fle = fle;
this.retryCount = retryCount;
this.errorsOnly = errorsOnly;
this.timeseries = timeseries;
this.defaultTimestamp = defaultTimestamp;
}
// Metrics corresponding to
// Top::get(opCtx->getClient()->getServiceContext()).record(...).
_checkTopMetrics(
top0,
top1,
inserted,
retriedInsert,
updateCount,
deleteCount,
perNamespaceMetrics,
nsIndicesInRequest,
) {
if (this.fle) {
// FLE do not set those in Top due to redaction.
for (const ns of this.namespaces) {
assert.eq(top0[ns].update, undefined);
assert.eq(top1[ns].update, undefined);
assert.eq(top0[ns].remove, undefined);
assert.eq(top1[ns].remove, undefined);
assert.eq(top0[ns].insert, undefined);
assert.eq(top1[ns].insert, undefined);
}
} else {
for (const idx of nsIndicesInRequest) {
const ns = this.namespaces[idx];
if (perNamespaceMetrics != undefined) {
inserted = perNamespaceMetrics[ns].inserted ?? 0;
updateCount = perNamespaceMetrics[ns].updateCount ?? 0;
deleteCount = perNamespaceMetrics[ns].deleteCount ?? 0;
retriedInsert = perNamespaceMetrics[ns].retriedInsert ?? 0;
}
checkEqual(
top0[ns].update.count,
updateCount,
top1[ns].update.count,
`update.count mismatch for ${ns}`,
);
checkEqual(top0[ns].remove.count, deleteCount, top1[ns].remove.count, "remove.count mismatch");
checkEqual(
top0[ns].insert.count,
inserted + retriedInsert,
top1[ns].insert.count,
`insert.count mismatch for ${ns}`,
);
}
}
}
// Metrics corresponding to CurOp::get(opCtx)->debug().additiveMetrics.
_checkAdditiveMetrics(status0, status1, actualInserts, updated, fleSafeContentUpdates, deleted) {
const doc0 = status0.metrics.document;
const doc1 = status1.metrics.document;
checkEqual(doc0.updated, updated + fleSafeContentUpdates, doc1.updated, "document.updated mismatch");
checkEqual(doc0.deleted, deleted, doc1.deleted, "document.deleted mismatch");
checkEqual(doc0.inserted, actualInserts, doc1.inserted, "document.inserted mismatch");
}
// Metrics corresponding to ServerWriteConcernMetrics
_checkWriteConcernMetrics(
status0,
status1,
inserted,
actualInserts,
updateCount,
fleSafeContentUpdates,
deleteCount,
deleted,
retryCount,
) {
const wC0 = status0.opWriteConcernCounters;
const wC1 = status1.opWriteConcernCounters;
if (this.fle) {
// Due to FLE implementation, the actual opWriteConcernCounters metrics logged both
// for bulkWrite and normal commands are using the implicit default write concern, not
// wmajority. FLE update is a findAndModify followed by an a safeContent update.
checkEqual(wC0.update.wmajority, 0, wC1.update.wmajority, "update.wmajority mismatch");
checkEqual(wC0.delete.wmajority, 0, wC1.delete.wmajority, "delete.wmajority mismatch");
checkEqual(wC0.insert.wmajority, 0, wC1.insert.wmajority, "insert.wmajority mismatch");
checkEqual(wC0.update.none, fleSafeContentUpdates, wC1.update.none, "update.none mismatch");
checkEqual(wC0.delete.none, deleted, wC1.delete.none, "delete.none mismatch");
checkEqual(wC0.insert.none, actualInserts, wC1.insert.none, "insert.none mismatch");
} else {
// All calls are done with {writeConcern: {w: "majority"}} so "none" count should be
// unchanged, except for timeseries.
// TODO SERVER-84799 timeseries condition below and comment above.
const [uNone, uMaj] = this.timeseries && retryCount > 1 ? [updateCount, 0] : [0, updateCount];
const [dNone, dMaj] = this.timeseries && retryCount > 1 ? [deleteCount, 0] : [0, deleteCount];
checkEqual(wC0.update.wmajority, uMaj, wC1.update.wmajority, "update.wmajority mismatch");
checkEqual(wC0.delete.wmajority, dMaj, wC1.delete.wmajority, "delete.wmajority mismatch");
checkEqual(wC0.insert.wmajority, inserted, wC1.insert.wmajority, "insert.wmajority mismatch");
checkEqual(wC0.update.none, uNone, wC1.update.none, "update.none mismatch");
checkEqual(wC0.delete.none, dNone, wC1.delete.none, "delete.none mismatch");
checkEqual(wC0.insert.none, 0, wC1.insert.none, "insert.none mismatch");
}
}
// Metrics corresponding to UpdateMetrics.
_checkUpdateMetrics(status0, status1, updateArrayFilters, updatePipeline) {
// For bulkWrite, status1.metrics.commands.bulkWrite can exist even without
// arrayFilters and pipeline being set since it also counts "total" and "failed".
const updateField = this.bulkWrite ? "bulkWrite" : "update";
const update1 = status1.metrics.commands[updateField];
const update0 = status0.metrics.commands[updateField];
if (update1 != undefined && (update1.arrayFilters != undefined || update1.pipeline != undefined)) {
let arrayFilters0 = 0;
let pipeline0 = 0;
if (update0 != undefined) {
arrayFilters0 = update0.arrayFilters;
pipeline0 = update0.pipeline;
}
let arrayFiltersDiff = updateArrayFilters;
checkEqual(arrayFilters0, arrayFiltersDiff, update1.arrayFilters, "update.arrayFilters mismatch");
checkEqual(pipeline0, updatePipeline, update1.pipeline, "update.pipeline mismatch");
} else {
assert.eq(0, updateArrayFilters, `metrics.commands.${updateField} should not be undefined`);
assert.eq(0, updatePipeline, `metrics.commands.${updateField} should not be undefined`);
}
}
// Metrics corresponding to globalOpCounters.gotInsert() / gotDelete() / gotUpdate()
_checkOpCounters(
status0,
status1,
inserted,
actualInserts,
updateCount,
updateManyCount,
opcounterFactor,
fleSafeContentUpdates,
deleteCount,
deleteManyCount,
retryCount,
) {
let op0 = status0.opcounters;
let op1 = status1.opcounters;
// For BulkWrite, each statement is always a single insert/update so `inserted` and
// `updated` also are the number of statements. Also BulkWrite FLE does not support mixing
// updates with inserts.
let numberOfInsertStatements = inserted;
let numberOfUpdateStatements = updateCount;
let opUpdated = updateCount;
let opDeleted = deleteCount;
let opInserted = actualInserts;
let opUpdateMany = updateManyCount;
let opDeleteMany = deleteManyCount;
if (this.isMongos) {
// TODO SERVER-84798 timeseries should increase by retryCount too.
if (!this.timeseries) {
opInserted *= retryCount;
}
opDeleted *= retryCount;
opUpdated *= retryCount;
opUpdateMany *= retryCount;
opDeleteMany *= retryCount;
if (this.fle) {
// On Mongos, there is one extra opcounter increment per statement.
opDeleted *= 2;
opDeleteMany *= 2;
if (numberOfInsertStatements >= 0) {
opInserted = (numberOfInsertStatements + actualInserts) * retryCount;
}
// FLE2 updates don't execute the safeContent updates on retries, only the
// findAndModify step.
opUpdated = (this.bulkWrite ? 0 : numberOfUpdateStatements) * retryCount + fleSafeContentUpdates;
opUpdateMany = (this.bulkWrite ? 0 : updateManyCount) * retryCount;
}
} else if (this.fle) {
opUpdated = fleSafeContentUpdates;
}
checkEqual(op0.update, opUpdated * opcounterFactor, op1.update, "opcounters.update mismatch");
checkEqual(op0.delete, opDeleted, op1.delete, "opcounters.delete mismatch");
checkEqual(op0.insert, opInserted, op1.insert, "opcounters.insert mismatch");
// TODO: SERVER-103358 currently we increment for internal retries. The code block below
// allows an extra updateManyCount in case an internal retry incremented the counter an
// extra time. Remove this special case once this is fixed.
if (this.bulkWrite && this.isMongos && updateManyCount > 0) {
const before = status0.metrics.query.updateManyCount;
const afterActual = status1.metrics.query.updateManyCount;
// We add one for an extra retry and one because the range is exclusive.
const afterUpperBound = status0.metrics.query.updateManyCount + opUpdateMany + 2;
const errMsg = `metrics.query.updateMany mismatch: ${afterActual}} + is not between ${
before
} and ${afterUpperBound}`;
assert.between(before, afterActual, afterUpperBound, errMsg, false /* inclusive */);
} else {
checkEqual(
status0.metrics.query.updateManyCount,
opUpdateMany,
status1.metrics.query.updateManyCount,
"metrics.query.updateMany mismatch",
);
}
checkEqual(
status0.metrics.query.deleteManyCount,
opDeleteMany,
status1.metrics.query.deleteManyCount,
"metrics.query.deleteMany mismatch",
);
}
_checkMongodOnlyMetrics(
status0,
top0,
status1,
top1,
updated,
updateCount,
inserted,
deleted,
deleteCount,
retriedInsert,
retriedCommandsCount,
retriedStatementsCount,
fleSafeContentUpdates,
actualInserts,
retryCount,
perNamespaceMetrics,
nsIndicesInRequest,
) {
this._checkAdditiveMetrics(status0, status1, actualInserts, updated, fleSafeContentUpdates, deleted);
// Not checking metrics.document.returned or metrics.queryExecutor.scannedObjects as
// they are not stable across runs of the test, even without bulkWrite.
// metrics.queryExecutor.scanned is stable but the FLE logic for it is very complicated
// to maintain here.
this._checkTopMetrics(
top0,
top1,
inserted,
retriedInsert,
updateCount,
deleteCount,
perNamespaceMetrics,
nsIndicesInRequest,
);
this._checkWriteConcernMetrics(
status0,
status1,
inserted,
actualInserts,
updateCount,
fleSafeContentUpdates,
deleteCount,
deleted,
retryCount,
);
// Metrics corresponding to
// RetryableWritesStats::get(opCtx)->incrementRetriedCommandsCount
// and incrementRetriedStatementsCount.
let t0 = status0.transactions;
let t1 = status1.transactions;
checkEqual(
t0.retriedCommandsCount,
retriedCommandsCount,
t1.retriedCommandsCount,
"transactions.retriedCommandsCount mismatch",
);
checkEqual(
t0.retriedStatementsCount,
retriedStatementsCount,
t1.retriedStatementsCount,
"transactions.retriedStatementsCount mismatch",
);
}
_checkMongosOnlyMetrics(
status0,
status1,
updateCount,
inserted,
deleteCount,
eqIndexedEncryptedFields,
singleUpdateForBulkWrite,
singleInsertForBulkWrite,
singleDeleteForBulkWrite,
insertShardField,
updateShardField,
deleteShardField,
fleSafeContentUpdates,
retryCount,
actualInserts,
) {
const targeted0 = status0.shardingStatistics.numHostsTargeted;
const targeted1 = status1.shardingStatistics.numHostsTargeted;
let targetedUpdate = updateCount * retryCount;
let targetedDelete = deleteCount * retryCount;
let targetedInsert = actualInserts;
let unshardedInsert = 0;
if (this.fle) {
targetedUpdate = fleSafeContentUpdates;
// BulkWrite FLE does not allow mixing insert and update so updated != 0 means
// it is an FLE update.
if (updateCount != 0) {
targetedInsert = 0;
} else {
targetedInsert = inserted;
}
// The FLE inserts in the state collection are batched in a single command so they count
// as 1 here, unlike for opcounters. eqIndexedEncryptedFields is per insert/update and
// we don't allow mixing insert and update for FLE bulkWrite.
unshardedInsert = 2 * (inserted + updateCount) * (eqIndexedEncryptedFields > 0 ? 1 : 0);
unshardedInsert *= retryCount;
}
// TODO SERVER-84798 timeseries should increase by retryCount too.
if (!this.timeseries) {
targetedInsert *= retryCount;
}
if (this.timeseries && !this.bulkWrite && updateShardField === "manyShards") {
targetedUpdate = targetedUpdate * 2;
}
if (this.timeseries && !this.bulkWrite && deleteShardField === "manyShards") {
targetedDelete = targetedDelete * 2;
}
if (this.bulkWrite) {
if (singleUpdateForBulkWrite) {
targetedUpdate = 1;
}
if (singleInsertForBulkWrite) {
targetedInsert = 1;
}
if (singleDeleteForBulkWrite) {
targetedDelete = 1;
}
}
checkEqual(
targeted0.insert[insertShardField],
targetedInsert,
targeted1.insert[insertShardField],
`insert.${insertShardField} mismatch`,
);
checkEqual(
targeted0.insert.unsharded,
unshardedInsert,
targeted1.insert.unsharded,
"insert.unsharded mismatch",
);
checkEqual(
targeted0.update[updateShardField],
targetedUpdate,
targeted1.update[updateShardField],
`update.${updateShardField} mismatch`,
);
checkEqual(
targeted0.delete[deleteShardField],
targetedDelete,
targeted1.delete[deleteShardField],
`delete.${deleteShardField} mismatch`,
);
}
// eqIndexedEncryptedFields is per insert/update in the command.
_checkMetricsImpl(
status0,
top0,
nsIndicesInRequest,
{
updated = 0,
updateCount = undefined,
updateManyCount = 0,
inserted = 0,
deleted = 0,
deleteCount = undefined,
deleteManyCount = 0,
eqIndexedEncryptedFields = 0,
retriedInsert = 0,
updateArrayFilters = 0,
updatePipeline = 0,
singleUpdateForBulkWrite = false,
singleInsertForBulkWrite = false,
singleDeleteForBulkWrite = false,
insertShardField = "oneShard",
updateShardField = this.timeseries ? "oneShard" : "allShards",
deleteShardField = this.timeseries ? "oneShard" : "allShards",
retryCount = 0,
opcounterFactor = 1,
perNamespaceMetrics = undefined,
},
) {
// updateCount is the number of update commands, it is different from updated when
// multi: true.
if (updateCount == undefined) {
updateCount = updated;
}
if (deleteCount == undefined) {
deleteCount = deleted;
}
const status1 = this.testDB.serverStatus();
// An FLE update causes one findAndModify followed by an optional (absent if
// eqIndexedEncryptedFields == 0) update.
let fleSafeContentUpdates = updated > 0 && eqIndexedEncryptedFields > 0 ? 1 : 0;
if (this.timeseries) {
if (retriedInsert != 0) {
inserted = this.retryCount;
retriedInsert = 0;
}
}
// FLE2 has 2 side collection inserts per indexedEncryptedField touched by each
// insert/update.
let actualInserts = inserted + 2 * (inserted + updated) * eqIndexedEncryptedFields;
if (this.isMongos) {
this._checkMongosOnlyMetrics(
status0,
status1,
updateCount,
inserted,
deleteCount,
eqIndexedEncryptedFields,
singleUpdateForBulkWrite,
singleInsertForBulkWrite,
singleDeleteForBulkWrite,
insertShardField,
updateShardField,
deleteShardField,
fleSafeContentUpdates,
retryCount,
actualInserts,
);
} else {
const top1 = this.testDB.adminCommand({top: 1}).totals;
// See comment on unshardedInsert for the ternary.
const retriedCommandsCount =
(1 + 2 * (eqIndexedEncryptedFields > 0 ? 1 : 0) + (this.bulkWrite && this.fle)) * (retryCount - 1);
const retriedStatementsCount = (1 + 2 * eqIndexedEncryptedFields) * (retryCount - 1);
this._checkMongodOnlyMetrics(
status0,
top0,
status1,
top1,
updated,
updateCount,
inserted,
deleted,
deleteCount,
retriedInsert,
retriedCommandsCount,
retriedStatementsCount,
fleSafeContentUpdates,
actualInserts,
retryCount,
perNamespaceMetrics,
nsIndicesInRequest,
);
}
this._checkOpCounters(
status0,
status1,
inserted,
actualInserts,
updateCount,
updateManyCount,
opcounterFactor,
fleSafeContentUpdates,
deleteCount,
deleteManyCount,
retryCount,
);
this._checkUpdateMetrics(status0, status1, updateArrayFilters, updatePipeline);
}
// Add the writeConcern. If this.timeseries is true, add a timestamp field if missing.
executeCommand(command) {
if (this.timeseries) {
if (command.hasOwnProperty("documents")) {
for (let document of command.documents) {
if (!document.hasOwnProperty("timestamp")) {
document.timestamp = this.defaultTimestamp;
}
}
} else if (command.hasOwnProperty("updates")) {
for (let document of command.updates) {
if (!document.q.hasOwnProperty("timestamp")) {
document.q.timestamp = this.defaultTimestamp;
}
}
}
}
command.writeConcern = {w: "majority"};
return assert.commandWorked(this.testDB.runCommand(command));
}
// Adds a timestamp field if missing. Called when this.timeseries is true.
_addTimestamp(bulkWriteOps) {
for (let op of bulkWriteOps) {
if (op.hasOwnProperty("delete")) {
continue;
}
if (op.hasOwnProperty("document")) {
if (!op.document.hasOwnProperty("timestamp")) {
op.document.timestamp = this.defaultTimestamp;
}
} else {
if (!op.filter.hasOwnProperty("timestamp")) {
op.filter.timestamp = this.defaultTimestamp;
}
}
}
}
_findNsIndicesInRequest(bulkWriteOps) {
const nsIndicesInRequest = new Set();
for (const op of bulkWriteOps) {
let idx = op.insert;
if (op.update != undefined) {
idx = op.update;
} else if (op.delete != undefined) {
idx = op.delete;
}
nsIndicesInRequest.add(idx);
}
return nsIndicesInRequest;
}
checkMetrics(testcaseName, bulkWriteOps, normalCommands, expectedMetrics) {
jsTest.log.info(
`Testcase: ${testcaseName} (on a ${
this.isMongos ? "ShardingTest" : "ReplSetTest"
} with bulkWrite = ${this.bulkWrite}, errorsOnly = ${
this.errorsOnly
} and timeseries = ${this.timeseries}).`,
);
const statusBefore = this.testDB.serverStatus();
const topBefore = this.isMongos ? undefined : this.testDB.adminCommand({top: 1}).totals;
if (this.bulkWrite) {
if (this.timeseries) {
this._addTimestamp(bulkWriteOps);
}
const namespaces = this.namespaces.map((namespace) => {
return {ns: namespace};
});
assert.commandWorked(
this.testDB.adminCommand({
bulkWrite: 1,
ops: bulkWriteOps,
nsInfo: namespaces,
writeConcern: {w: "majority"},
errorsOnly: this.errorsOnly,
}),
);
} else {
for (let command of normalCommands) {
this.executeCommand(command);
}
}
expectedMetrics.retryCount = 1;
this._checkMetricsImpl(statusBefore, topBefore, this._findNsIndicesInRequest(bulkWriteOps), expectedMetrics);
}
checkMetricsWithRetries(testcaseName, bulkWriteOps, normalCommand, expectedMetrics, lsid, txnNumber) {
jsTest.log.info(
`Testcase: ${testcaseName} (on a ${
this.isMongos ? "ShardingTest" : "ReplSetTest"
} with bulkWrite = ${this.bulkWrite}, errorsOnly = ${
this.errorsOnly
} and timeseries = ${this.timeseries}).`,
);
if (this.timeseries && this.isMongos) {
// For sharded timeseries updates we will get an extra opcounter for a retryable write
// since we execute them as an internal transaction which does an additional opcounter.
expectedMetrics.opcounterFactor = 2;
if (expectedMetrics.updateArrayFilters) {
expectedMetrics.updateArrayFilters = expectedMetrics.updateArrayFilters * 2;
}
}
let statusBefore = this.testDB.serverStatus();
let topBefore = this.isMongos ? undefined : this.testDB.adminCommand({top: 1}).totals;
if (this.bulkWrite) {
if (this.timeseries) {
this._addTimestamp(bulkWriteOps);
}
const namespaces = this.namespaces.map((namespace) => {
return {ns: namespace};
});
for (let i = 0; i < this.retryCount; ++i) {
let res = assert.commandWorked(
this.testDB.adminCommand({
bulkWrite: 1,
ops: bulkWriteOps,
nsInfo: namespaces,
lsid: lsid,
txnNumber: txnNumber,
writeConcern: {w: "majority"},
}),
);
assert.eq(res.cursor.firstBatch[0].ok, 1);
}
} else {
normalCommand.writeConcern = {w: "majority"};
normalCommand.lsid = lsid;
normalCommand.txnNumber = txnNumber;
for (let i = 0; i < this.retryCount; ++i) {
this.executeCommand(normalCommand);
}
}
expectedMetrics.retryCount = this.retryCount;
this._checkMetricsImpl(statusBefore, topBefore, this._findNsIndicesInRequest(bulkWriteOps), expectedMetrics);
}
}