mongo/jstests/libs/crud_ops_to_bulk_write_lib.js

486 lines
18 KiB
JavaScript

/**
* Utility functions used to convert CRUD ops into a bulkWrite command.
* Converts the bulkWrite responses into the original CRUD response.
*/
export const BulkWriteUtils = (function () {
const commandsToBulkWriteOverride = new Set(["insert", "update", "delete"]);
let numOpsPerResponse = [];
let nsInfos = [];
let bufferedOps = [];
let letObj = null;
let wc = null;
let ordered = true;
let bypassDocumentValidation = null;
let hasUpsert = false;
let rawData = false;
function canProcessAsBulkWrite(cmdName) {
return commandsToBulkWriteOverride.has(cmdName);
}
function resetBulkWriteBatch() {
numOpsPerResponse = [];
nsInfos = [];
bufferedOps = [];
letObj = null;
wc = null;
bypassDocumentValidation = null;
ordered = true;
hasUpsert = false;
rawData = false;
}
function getCurrentBatchSize() {
return numOpsPerResponse.length;
}
function getBulkWriteState() {
return {
nsInfos: nsInfos,
bypassDocumentValidation: bypassDocumentValidation,
letObj: letObj,
ordered: ordered,
rawData: rawData,
};
}
function getNamespaces() {
return nsInfos;
}
function getBulkWriteCmd() {
return {
"bulkWrite": 1,
"ops": bufferedOps,
"nsInfo": nsInfos,
"ordered": ordered != null ? ordered : true,
"bypassDocumentValidation": bypassDocumentValidation,
"rawData": rawData,
};
}
function flushCurrentBulkWriteBatch(
conn,
lsid,
originalRunCommand,
makeRunCommandArgs,
isMultiOp,
additionalParameters = {},
) {
// Should not be possible to reach if bypassDocumentValidation is not set.
assert(bypassDocumentValidation != null);
let bulkWriteCmd = {
"bulkWrite": 1,
"ops": bufferedOps,
"nsInfo": nsInfos,
"ordered": ordered != null ? ordered : true,
"bypassDocumentValidation": bypassDocumentValidation,
"rawData": rawData,
};
if (wc != null) {
bulkWriteCmd["writeConcern"] = wc;
}
if (letObj != null) {
bulkWriteCmd["let"] = letObj;
}
if (lsid) {
bulkWriteCmd["lsid"] = lsid;
}
// Add in additional parameters to the bulkWrite command.
bulkWriteCmd = {...bulkWriteCmd, ...additionalParameters};
if (!isMultiOp && !canBeErrorsOnlySingleCrudOp()) {
bulkWriteCmd["errorsOnly"] = false;
}
let resp = originalRunCommand.apply(conn, makeRunCommandArgs(bulkWriteCmd, "admin"));
let response = convertBulkWriteResponse(bulkWriteCmd, isMultiOp, resp);
let finalResponse = response;
let expectedResponseLength = numOpsPerResponse.length;
// The following blocks are only relevant for batching multiple commands together
// to ensure that every separate command is run even if previous commands fail.
// The way we do this is by removing all ops that were executed from `ops`, and also
// remove any ops that came from the same command. For example if we made a batch of
// bulkWrite = insert1 + insert2 where insert1 = [{a:1},{b:1},{c:1}], and insert2 = [{d:1}]
// and the op at index 1 failed we would re-run the bulkWrite with just `{d:1}` since we
// need to achive the result of running both insert1 + insert2.
if (bulkWriteCmd.errorsOnly == true) {
// For errorsOnly we will only ever have items in the response cursor if an operation
// failed. We also always run batched bulkWrites as ordered:true so only one command can
// fail. Once we get a bulkWrite with no errors then we have executed all ops.
while (resp.cursor == null || resp.cursor.firstBatch.length != 0) {
let idx = resp.cursor ? resp.cursor.firstBatch[0].idx : 0;
let i = 0;
while (i <= idx) {
let numOpsToShift = numOpsPerResponse.shift();
for (let j = 0; j < numOpsToShift; j++) {
bufferedOps.shift();
i++;
}
// If there were no ops we need to progress forward to avoid infinite looping.
if (numOpsToShift == 0) {
i++;
}
}
// Can't execute a bulkWrite with no ops remaining.
if (bufferedOps.length == 0) {
break;
}
bulkWriteCmd.ops = bufferedOps;
resp = originalRunCommand.apply(conn, makeRunCommandArgs(bulkWriteCmd, "admin"));
response = convertBulkWriteResponse(bulkWriteCmd, isMultiOp, resp);
finalResponse = finalResponse.concat(response);
}
} else {
// Retry on ordered:true failures by re-running subset of original bulkWrite command.
while (finalResponse.length != expectedResponseLength) {
// Need to figure out how many ops we need to subset out. Every entry in
// numOpsPerResponse represents a number of bulkWrite ops that correspond to an
// initial CRUD op. We need to make sure we split at a CRUD op boundary in the
// bulkWrite.
for (let i = 0; i < response.length; i++) {
let target = numOpsPerResponse.shift();
for (let j = 0; j < target; j++) {
bufferedOps.shift();
}
}
bulkWriteCmd.ops = bufferedOps;
resp = originalRunCommand.apply(conn, makeRunCommandArgs(bulkWriteCmd, "admin"));
response = convertBulkWriteResponse(bulkWriteCmd, isMultiOp, resp);
finalResponse = finalResponse.concat(response);
}
}
return finalResponse;
}
function initializeResponse(op) {
if (op.hasOwnProperty("update")) {
// Update always has nModified field set.
return {"n": 0, "nModified": 0, "ok": 1};
}
return {"n": 0, "ok": 1};
}
function canBeErrorsOnlySingleCrudOp() {
// The conditions we need to meet are as follows:
// An updateOne, deleteOne, insertOne, or insertMany command
// Multiple crud ops make up this batch.
if (numOpsPerResponse.length != 1) {
assert.eq(0, numOpsPerResponse.length);
return false;
}
if (hasUpsert) {
return false;
}
return true;
}
function processErrorsOnlySingleCrudOpResponse(cmd, bulkWriteResponse) {
// Need to construct the original command response based on the summary fields.
let response = initializeResponse(cmd.ops[0]);
if (cmd.ops[0].hasOwnProperty("insert")) {
response.n += bulkWriteResponse.nInserted;
} else if (cmd.ops[0].hasOwnProperty("update")) {
response.n += bulkWriteResponse.nMatched;
// 'nModified' field should always exist for an update op.
assert(bulkWriteResponse.hasOwnProperty("nModified"), bulkWriteResponse);
response.nModified += bulkWriteResponse.nModified;
} else if (cmd.ops[0].hasOwnProperty("delete")) {
response.n += bulkWriteResponse.nDeleted;
} else {
throw new Error("Invalid bulkWrite op type. " + cmd.ops[0]);
}
[
"writeConcernError",
"retriedStmtIds",
"opTime",
"$clusterTime",
"electionId",
"operationTime",
"errorLabels",
"_mongo",
].forEach((property) => {
if (bulkWriteResponse.hasOwnProperty(property)) {
response[property] = bulkWriteResponse[property];
}
});
// Need to loop through any errors now.
if (bulkWriteResponse.cursor.firstBatch.length != 0) {
let cursorIdx = 0;
while (cursorIdx < bulkWriteResponse.cursor.firstBatch.length) {
let current = bulkWriteResponse.cursor.firstBatch[cursorIdx];
// For errorsOnly every cursor element must be an error.
assert.eq(0, current.ok, "command: " + tojson(cmd) + " : response: " + tojson(bulkWriteResponse));
if (!response.hasOwnProperty("writeErrors")) {
response["writeErrors"] = [];
}
let writeError = {index: current.idx, code: current.code, errmsg: current.errmsg};
// Include optional error fields if they exist.
["errInfo", "db", "collectionUUID", "expectedCollection", "actualCollection"].forEach((property) => {
if (current.hasOwnProperty(property)) {
writeError[property] = current[property];
}
});
response["writeErrors"].push(writeError);
cursorIdx++;
}
}
return response;
}
/**
* The purpose of this function is to take a server response from a bulkWrite command and to
* transform it to an array of responses for the corresponding CRUD commands that make up the
* bulkWrite.
*
* 'cmd' is the bulkWrite that was executed to generate the response
* 'orig' is the bulkWrite command response
*/
function convertBulkWriteResponse(cmd, isMultiOp, bulkWriteResponse) {
// a w0 write concern bulkWrite can result in just {ok: 1}, so if a response does not have
// 'cursor' field then just return the response as is
if (!bulkWriteResponse.cursor) {
return [bulkWriteResponse];
}
// Handle processing response for single CRUD op with errors only.
// The multi op code can sometimes take this path but we don't care about the response
// conversion for that so it is okay.
if (cmd.errorsOnly == true && !isMultiOp && canBeErrorsOnlySingleCrudOp()) {
return [processErrorsOnlySingleCrudOpResponse(cmd, bulkWriteResponse)];
}
let responses = [];
if (bulkWriteResponse.ok == 1) {
let cursorIdx = 0;
for (let numOps of numOpsPerResponse) {
let num = 0;
let resp = initializeResponse(cmd.ops[cursorIdx]);
while (num < numOps) {
if (cursorIdx >= bulkWriteResponse.cursor.firstBatch.length) {
// this can happen if the bulkWrite encountered an error processing
// an op with ordered:true set. This means we have no more op responses
// left to process so push the current response we were building and
// return.
// If the last response has writeErrors set then it was in the middle of an
// op otherwise we are beginning a new op response and should not push it.
if (resp.writeErrors) {
responses.push(resp);
}
return responses;
}
let current = bulkWriteResponse.cursor.firstBatch[cursorIdx];
if (current.ok == 0) {
// Normal write contains an error.
if (!resp.hasOwnProperty("writeErrors")) {
resp["writeErrors"] = [];
}
let writeError = {index: num, code: current.code, errmsg: current.errmsg};
// Include optional error fields if they exist.
["errInfo", "db", "collectionUUID", "expectedCollection", "actualCollection"].forEach(
(property) => {
if (current.hasOwnProperty(property)) {
writeError[property] = current[property];
}
},
);
resp["writeErrors"].push(writeError);
} else {
resp.n += current.n;
// 'nModified' field should always exist for an update op.
if (resp.hasOwnProperty("nModified")) {
assert(current.hasOwnProperty("nModified"), current);
resp.nModified += current.nModified;
}
if (current.hasOwnProperty("upserted")) {
if (!resp.hasOwnProperty("upserted")) {
resp["upserted"] = [];
}
// Need to add the index of the upserted doc.
resp["upserted"].push({index: cursorIdx, ...current.upserted});
}
}
[
"writeConcernError",
"retriedStmtIds",
"opTime",
"$clusterTime",
"electionId",
"operationTime",
"errorLabels",
"_mongo",
].forEach((property) => {
if (bulkWriteResponse.hasOwnProperty(property)) {
resp[property] = bulkWriteResponse[property];
}
});
cursorIdx += 1;
num += 1;
}
responses.push(resp);
}
}
return responses;
}
function getNsInfoIdx(nsInfoEntry, collectionUUID, encryptionInformation, isTimeseriesNamespace) {
let idx = nsInfos.findIndex((element) => element.ns == nsInfoEntry);
if (idx == -1) {
idx = nsInfos.length;
let nsInfo = {ns: nsInfoEntry};
if (collectionUUID) {
nsInfo["collectionUUID"] = collectionUUID;
}
if (encryptionInformation) {
nsInfo["encryptionInformation"] = encryptionInformation;
}
if (isTimeseriesNamespace) {
nsInfo["isTimeseriesNamespace"] = isTimeseriesNamespace;
}
nsInfos.push(nsInfo);
}
return idx;
}
function processInsertOp(nsInfoIdx, doc) {
return {insert: nsInfoIdx, document: doc};
}
function processUpdateOp(nsInfoIdx, cmdObj, update) {
let op = {
"update": nsInfoIdx,
"filter": update.q,
"updateMods": update.u,
"multi": update.multi ? update.multi : false,
"upsert": update.upsert ? update.upsert : false,
};
["arrayFilters", "collation", "hint", "sampleId", "sort", "upsertSupplied"].forEach((property) => {
if (update.hasOwnProperty(property)) {
op[property] = update[property];
}
});
if (update.hasOwnProperty("c")) {
op["constants"] = update.c;
}
if (cmdObj.hasOwnProperty("let")) {
letObj = cmdObj.let;
}
if (op.upsert == true) {
hasUpsert = true;
}
return op;
}
function processDeleteOp(nsInfoIdx, cmdObj, deleteCmd) {
let op = {"delete": nsInfoIdx, "filter": deleteCmd.q, "multi": deleteCmd.limit == 0};
["sampleId", "collation", "hint"].forEach((property) => {
if (deleteCmd.hasOwnProperty(property)) {
op[property] = deleteCmd[property];
}
});
if (cmdObj.hasOwnProperty("let")) {
letObj = cmdObj.let;
}
return op;
}
function processCRUDOp(dbName, cmdName, cmdObj) {
// Set bypassDocumentValidation if necessary.
if (bypassDocumentValidation == null) {
bypassDocumentValidation = cmdObj.hasOwnProperty("bypassDocumentValidation")
? cmdObj.bypassDocumentValidation
: false;
}
ordered = cmdObj.hasOwnProperty("ordered") ? cmdObj.ordered : true;
if (cmdObj.hasOwnProperty("writeConcern")) {
wc = cmdObj.writeConcern;
}
rawData = cmdObj.hasOwnProperty("rawData") ? cmdObj.rawData : false;
let nsInfoEntry = dbName + "." + cmdObj[cmdName];
let nsInfoIdx = getNsInfoIdx(
nsInfoEntry,
cmdObj.collectionUUID,
cmdObj.encryptionInformation,
cmdObj.isTimeseriesNamespace,
);
let numOps = 0;
if (cmdName === "insert") {
assert(cmdObj.documents);
for (let doc of cmdObj.documents) {
bufferedOps.push(processInsertOp(nsInfoIdx, doc));
numOps += 1;
}
} else if (cmdName === "update") {
assert(cmdObj.updates);
for (let update of cmdObj.updates) {
bufferedOps.push(processUpdateOp(nsInfoIdx, cmdObj, update));
numOps += 1;
}
} else if (cmdName === "delete") {
assert(cmdObj.deletes);
for (let deleteCmd of cmdObj.deletes) {
bufferedOps.push(processDeleteOp(nsInfoIdx, cmdObj, deleteCmd));
numOps += 1;
}
} else {
throw new Error("Unrecognized command in bulkWrite override");
}
numOpsPerResponse.push(numOps);
}
return {
processCRUDOp: processCRUDOp,
getNsInfoIdx: getNsInfoIdx,
flushCurrentBulkWriteBatch: flushCurrentBulkWriteBatch,
resetBulkWriteBatch: resetBulkWriteBatch,
canProcessAsBulkWrite: canProcessAsBulkWrite,
getCurrentBatchSize: getCurrentBatchSize,
getBulkWriteState: getBulkWriteState,
getNamespaces: getNamespaces,
getBulkWriteCmd: getBulkWriteCmd,
};
})();