mirror of https://github.com/mongodb/mongo
486 lines
18 KiB
JavaScript
486 lines
18 KiB
JavaScript
/**
|
|
* Utility functions used to convert CRUD ops into a bulkWrite command.
|
|
* Converts the bulkWrite responses into the original CRUD response.
|
|
*/
|
|
export const BulkWriteUtils = (function () {
|
|
const commandsToBulkWriteOverride = new Set(["insert", "update", "delete"]);
|
|
|
|
let numOpsPerResponse = [];
|
|
let nsInfos = [];
|
|
let bufferedOps = [];
|
|
let letObj = null;
|
|
let wc = null;
|
|
let ordered = true;
|
|
let bypassDocumentValidation = null;
|
|
let hasUpsert = false;
|
|
let rawData = false;
|
|
|
|
function canProcessAsBulkWrite(cmdName) {
|
|
return commandsToBulkWriteOverride.has(cmdName);
|
|
}
|
|
|
|
function resetBulkWriteBatch() {
|
|
numOpsPerResponse = [];
|
|
nsInfos = [];
|
|
bufferedOps = [];
|
|
letObj = null;
|
|
wc = null;
|
|
bypassDocumentValidation = null;
|
|
ordered = true;
|
|
hasUpsert = false;
|
|
rawData = false;
|
|
}
|
|
|
|
function getCurrentBatchSize() {
|
|
return numOpsPerResponse.length;
|
|
}
|
|
|
|
function getBulkWriteState() {
|
|
return {
|
|
nsInfos: nsInfos,
|
|
bypassDocumentValidation: bypassDocumentValidation,
|
|
letObj: letObj,
|
|
ordered: ordered,
|
|
rawData: rawData,
|
|
};
|
|
}
|
|
|
|
function getNamespaces() {
|
|
return nsInfos;
|
|
}
|
|
|
|
function getBulkWriteCmd() {
|
|
return {
|
|
"bulkWrite": 1,
|
|
"ops": bufferedOps,
|
|
"nsInfo": nsInfos,
|
|
"ordered": ordered != null ? ordered : true,
|
|
"bypassDocumentValidation": bypassDocumentValidation,
|
|
"rawData": rawData,
|
|
};
|
|
}
|
|
|
|
function flushCurrentBulkWriteBatch(
|
|
conn,
|
|
lsid,
|
|
originalRunCommand,
|
|
makeRunCommandArgs,
|
|
isMultiOp,
|
|
additionalParameters = {},
|
|
) {
|
|
// Should not be possible to reach if bypassDocumentValidation is not set.
|
|
assert(bypassDocumentValidation != null);
|
|
|
|
let bulkWriteCmd = {
|
|
"bulkWrite": 1,
|
|
"ops": bufferedOps,
|
|
"nsInfo": nsInfos,
|
|
"ordered": ordered != null ? ordered : true,
|
|
"bypassDocumentValidation": bypassDocumentValidation,
|
|
"rawData": rawData,
|
|
};
|
|
|
|
if (wc != null) {
|
|
bulkWriteCmd["writeConcern"] = wc;
|
|
}
|
|
|
|
if (letObj != null) {
|
|
bulkWriteCmd["let"] = letObj;
|
|
}
|
|
|
|
if (lsid) {
|
|
bulkWriteCmd["lsid"] = lsid;
|
|
}
|
|
|
|
// Add in additional parameters to the bulkWrite command.
|
|
bulkWriteCmd = {...bulkWriteCmd, ...additionalParameters};
|
|
|
|
if (!isMultiOp && !canBeErrorsOnlySingleCrudOp()) {
|
|
bulkWriteCmd["errorsOnly"] = false;
|
|
}
|
|
|
|
let resp = originalRunCommand.apply(conn, makeRunCommandArgs(bulkWriteCmd, "admin"));
|
|
|
|
let response = convertBulkWriteResponse(bulkWriteCmd, isMultiOp, resp);
|
|
let finalResponse = response;
|
|
|
|
let expectedResponseLength = numOpsPerResponse.length;
|
|
|
|
// The following blocks are only relevant for batching multiple commands together
|
|
// to ensure that every separate command is run even if previous commands fail.
|
|
// The way we do this is by removing all ops that were executed from `ops`, and also
|
|
// remove any ops that came from the same command. For example if we made a batch of
|
|
// bulkWrite = insert1 + insert2 where insert1 = [{a:1},{b:1},{c:1}], and insert2 = [{d:1}]
|
|
// and the op at index 1 failed we would re-run the bulkWrite with just `{d:1}` since we
|
|
// need to achive the result of running both insert1 + insert2.
|
|
if (bulkWriteCmd.errorsOnly == true) {
|
|
// For errorsOnly we will only ever have items in the response cursor if an operation
|
|
// failed. We also always run batched bulkWrites as ordered:true so only one command can
|
|
// fail. Once we get a bulkWrite with no errors then we have executed all ops.
|
|
while (resp.cursor == null || resp.cursor.firstBatch.length != 0) {
|
|
let idx = resp.cursor ? resp.cursor.firstBatch[0].idx : 0;
|
|
let i = 0;
|
|
while (i <= idx) {
|
|
let numOpsToShift = numOpsPerResponse.shift();
|
|
for (let j = 0; j < numOpsToShift; j++) {
|
|
bufferedOps.shift();
|
|
i++;
|
|
}
|
|
// If there were no ops we need to progress forward to avoid infinite looping.
|
|
if (numOpsToShift == 0) {
|
|
i++;
|
|
}
|
|
}
|
|
|
|
// Can't execute a bulkWrite with no ops remaining.
|
|
if (bufferedOps.length == 0) {
|
|
break;
|
|
}
|
|
|
|
bulkWriteCmd.ops = bufferedOps;
|
|
resp = originalRunCommand.apply(conn, makeRunCommandArgs(bulkWriteCmd, "admin"));
|
|
response = convertBulkWriteResponse(bulkWriteCmd, isMultiOp, resp);
|
|
finalResponse = finalResponse.concat(response);
|
|
}
|
|
} else {
|
|
// Retry on ordered:true failures by re-running subset of original bulkWrite command.
|
|
while (finalResponse.length != expectedResponseLength) {
|
|
// Need to figure out how many ops we need to subset out. Every entry in
|
|
// numOpsPerResponse represents a number of bulkWrite ops that correspond to an
|
|
// initial CRUD op. We need to make sure we split at a CRUD op boundary in the
|
|
// bulkWrite.
|
|
for (let i = 0; i < response.length; i++) {
|
|
let target = numOpsPerResponse.shift();
|
|
for (let j = 0; j < target; j++) {
|
|
bufferedOps.shift();
|
|
}
|
|
}
|
|
bulkWriteCmd.ops = bufferedOps;
|
|
resp = originalRunCommand.apply(conn, makeRunCommandArgs(bulkWriteCmd, "admin"));
|
|
response = convertBulkWriteResponse(bulkWriteCmd, isMultiOp, resp);
|
|
finalResponse = finalResponse.concat(response);
|
|
}
|
|
}
|
|
|
|
return finalResponse;
|
|
}
|
|
|
|
function initializeResponse(op) {
|
|
if (op.hasOwnProperty("update")) {
|
|
// Update always has nModified field set.
|
|
return {"n": 0, "nModified": 0, "ok": 1};
|
|
}
|
|
return {"n": 0, "ok": 1};
|
|
}
|
|
|
|
function canBeErrorsOnlySingleCrudOp() {
|
|
// The conditions we need to meet are as follows:
|
|
// An updateOne, deleteOne, insertOne, or insertMany command
|
|
|
|
// Multiple crud ops make up this batch.
|
|
if (numOpsPerResponse.length != 1) {
|
|
assert.eq(0, numOpsPerResponse.length);
|
|
return false;
|
|
}
|
|
|
|
if (hasUpsert) {
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
function processErrorsOnlySingleCrudOpResponse(cmd, bulkWriteResponse) {
|
|
// Need to construct the original command response based on the summary fields.
|
|
let response = initializeResponse(cmd.ops[0]);
|
|
|
|
if (cmd.ops[0].hasOwnProperty("insert")) {
|
|
response.n += bulkWriteResponse.nInserted;
|
|
} else if (cmd.ops[0].hasOwnProperty("update")) {
|
|
response.n += bulkWriteResponse.nMatched;
|
|
// 'nModified' field should always exist for an update op.
|
|
assert(bulkWriteResponse.hasOwnProperty("nModified"), bulkWriteResponse);
|
|
response.nModified += bulkWriteResponse.nModified;
|
|
} else if (cmd.ops[0].hasOwnProperty("delete")) {
|
|
response.n += bulkWriteResponse.nDeleted;
|
|
} else {
|
|
throw new Error("Invalid bulkWrite op type. " + cmd.ops[0]);
|
|
}
|
|
|
|
[
|
|
"writeConcernError",
|
|
"retriedStmtIds",
|
|
"opTime",
|
|
"$clusterTime",
|
|
"electionId",
|
|
"operationTime",
|
|
"errorLabels",
|
|
"_mongo",
|
|
].forEach((property) => {
|
|
if (bulkWriteResponse.hasOwnProperty(property)) {
|
|
response[property] = bulkWriteResponse[property];
|
|
}
|
|
});
|
|
|
|
// Need to loop through any errors now.
|
|
if (bulkWriteResponse.cursor.firstBatch.length != 0) {
|
|
let cursorIdx = 0;
|
|
while (cursorIdx < bulkWriteResponse.cursor.firstBatch.length) {
|
|
let current = bulkWriteResponse.cursor.firstBatch[cursorIdx];
|
|
// For errorsOnly every cursor element must be an error.
|
|
assert.eq(0, current.ok, "command: " + tojson(cmd) + " : response: " + tojson(bulkWriteResponse));
|
|
|
|
if (!response.hasOwnProperty("writeErrors")) {
|
|
response["writeErrors"] = [];
|
|
}
|
|
let writeError = {index: current.idx, code: current.code, errmsg: current.errmsg};
|
|
|
|
// Include optional error fields if they exist.
|
|
["errInfo", "db", "collectionUUID", "expectedCollection", "actualCollection"].forEach((property) => {
|
|
if (current.hasOwnProperty(property)) {
|
|
writeError[property] = current[property];
|
|
}
|
|
});
|
|
|
|
response["writeErrors"].push(writeError);
|
|
cursorIdx++;
|
|
}
|
|
}
|
|
|
|
return response;
|
|
}
|
|
|
|
/**
|
|
* The purpose of this function is to take a server response from a bulkWrite command and to
|
|
* transform it to an array of responses for the corresponding CRUD commands that make up the
|
|
* bulkWrite.
|
|
*
|
|
* 'cmd' is the bulkWrite that was executed to generate the response
|
|
* 'orig' is the bulkWrite command response
|
|
*/
|
|
function convertBulkWriteResponse(cmd, isMultiOp, bulkWriteResponse) {
|
|
// a w0 write concern bulkWrite can result in just {ok: 1}, so if a response does not have
|
|
// 'cursor' field then just return the response as is
|
|
if (!bulkWriteResponse.cursor) {
|
|
return [bulkWriteResponse];
|
|
}
|
|
|
|
// Handle processing response for single CRUD op with errors only.
|
|
// The multi op code can sometimes take this path but we don't care about the response
|
|
// conversion for that so it is okay.
|
|
if (cmd.errorsOnly == true && !isMultiOp && canBeErrorsOnlySingleCrudOp()) {
|
|
return [processErrorsOnlySingleCrudOpResponse(cmd, bulkWriteResponse)];
|
|
}
|
|
|
|
let responses = [];
|
|
if (bulkWriteResponse.ok == 1) {
|
|
let cursorIdx = 0;
|
|
for (let numOps of numOpsPerResponse) {
|
|
let num = 0;
|
|
let resp = initializeResponse(cmd.ops[cursorIdx]);
|
|
while (num < numOps) {
|
|
if (cursorIdx >= bulkWriteResponse.cursor.firstBatch.length) {
|
|
// this can happen if the bulkWrite encountered an error processing
|
|
// an op with ordered:true set. This means we have no more op responses
|
|
// left to process so push the current response we were building and
|
|
// return.
|
|
// If the last response has writeErrors set then it was in the middle of an
|
|
// op otherwise we are beginning a new op response and should not push it.
|
|
if (resp.writeErrors) {
|
|
responses.push(resp);
|
|
}
|
|
return responses;
|
|
}
|
|
|
|
let current = bulkWriteResponse.cursor.firstBatch[cursorIdx];
|
|
|
|
if (current.ok == 0) {
|
|
// Normal write contains an error.
|
|
if (!resp.hasOwnProperty("writeErrors")) {
|
|
resp["writeErrors"] = [];
|
|
}
|
|
let writeError = {index: num, code: current.code, errmsg: current.errmsg};
|
|
|
|
// Include optional error fields if they exist.
|
|
["errInfo", "db", "collectionUUID", "expectedCollection", "actualCollection"].forEach(
|
|
(property) => {
|
|
if (current.hasOwnProperty(property)) {
|
|
writeError[property] = current[property];
|
|
}
|
|
},
|
|
);
|
|
|
|
resp["writeErrors"].push(writeError);
|
|
} else {
|
|
resp.n += current.n;
|
|
// 'nModified' field should always exist for an update op.
|
|
if (resp.hasOwnProperty("nModified")) {
|
|
assert(current.hasOwnProperty("nModified"), current);
|
|
resp.nModified += current.nModified;
|
|
}
|
|
if (current.hasOwnProperty("upserted")) {
|
|
if (!resp.hasOwnProperty("upserted")) {
|
|
resp["upserted"] = [];
|
|
}
|
|
// Need to add the index of the upserted doc.
|
|
resp["upserted"].push({index: cursorIdx, ...current.upserted});
|
|
}
|
|
}
|
|
|
|
[
|
|
"writeConcernError",
|
|
"retriedStmtIds",
|
|
"opTime",
|
|
"$clusterTime",
|
|
"electionId",
|
|
"operationTime",
|
|
"errorLabels",
|
|
"_mongo",
|
|
].forEach((property) => {
|
|
if (bulkWriteResponse.hasOwnProperty(property)) {
|
|
resp[property] = bulkWriteResponse[property];
|
|
}
|
|
});
|
|
|
|
cursorIdx += 1;
|
|
num += 1;
|
|
}
|
|
responses.push(resp);
|
|
}
|
|
}
|
|
return responses;
|
|
}
|
|
|
|
function getNsInfoIdx(nsInfoEntry, collectionUUID, encryptionInformation, isTimeseriesNamespace) {
|
|
let idx = nsInfos.findIndex((element) => element.ns == nsInfoEntry);
|
|
if (idx == -1) {
|
|
idx = nsInfos.length;
|
|
let nsInfo = {ns: nsInfoEntry};
|
|
if (collectionUUID) {
|
|
nsInfo["collectionUUID"] = collectionUUID;
|
|
}
|
|
if (encryptionInformation) {
|
|
nsInfo["encryptionInformation"] = encryptionInformation;
|
|
}
|
|
if (isTimeseriesNamespace) {
|
|
nsInfo["isTimeseriesNamespace"] = isTimeseriesNamespace;
|
|
}
|
|
nsInfos.push(nsInfo);
|
|
}
|
|
return idx;
|
|
}
|
|
|
|
function processInsertOp(nsInfoIdx, doc) {
|
|
return {insert: nsInfoIdx, document: doc};
|
|
}
|
|
|
|
function processUpdateOp(nsInfoIdx, cmdObj, update) {
|
|
let op = {
|
|
"update": nsInfoIdx,
|
|
"filter": update.q,
|
|
"updateMods": update.u,
|
|
"multi": update.multi ? update.multi : false,
|
|
"upsert": update.upsert ? update.upsert : false,
|
|
};
|
|
|
|
["arrayFilters", "collation", "hint", "sampleId", "sort", "upsertSupplied"].forEach((property) => {
|
|
if (update.hasOwnProperty(property)) {
|
|
op[property] = update[property];
|
|
}
|
|
});
|
|
|
|
if (update.hasOwnProperty("c")) {
|
|
op["constants"] = update.c;
|
|
}
|
|
|
|
if (cmdObj.hasOwnProperty("let")) {
|
|
letObj = cmdObj.let;
|
|
}
|
|
|
|
if (op.upsert == true) {
|
|
hasUpsert = true;
|
|
}
|
|
|
|
return op;
|
|
}
|
|
|
|
function processDeleteOp(nsInfoIdx, cmdObj, deleteCmd) {
|
|
let op = {"delete": nsInfoIdx, "filter": deleteCmd.q, "multi": deleteCmd.limit == 0};
|
|
|
|
["sampleId", "collation", "hint"].forEach((property) => {
|
|
if (deleteCmd.hasOwnProperty(property)) {
|
|
op[property] = deleteCmd[property];
|
|
}
|
|
});
|
|
|
|
if (cmdObj.hasOwnProperty("let")) {
|
|
letObj = cmdObj.let;
|
|
}
|
|
|
|
return op;
|
|
}
|
|
|
|
function processCRUDOp(dbName, cmdName, cmdObj) {
|
|
// Set bypassDocumentValidation if necessary.
|
|
if (bypassDocumentValidation == null) {
|
|
bypassDocumentValidation = cmdObj.hasOwnProperty("bypassDocumentValidation")
|
|
? cmdObj.bypassDocumentValidation
|
|
: false;
|
|
}
|
|
|
|
ordered = cmdObj.hasOwnProperty("ordered") ? cmdObj.ordered : true;
|
|
|
|
if (cmdObj.hasOwnProperty("writeConcern")) {
|
|
wc = cmdObj.writeConcern;
|
|
}
|
|
|
|
rawData = cmdObj.hasOwnProperty("rawData") ? cmdObj.rawData : false;
|
|
|
|
let nsInfoEntry = dbName + "." + cmdObj[cmdName];
|
|
let nsInfoIdx = getNsInfoIdx(
|
|
nsInfoEntry,
|
|
cmdObj.collectionUUID,
|
|
cmdObj.encryptionInformation,
|
|
cmdObj.isTimeseriesNamespace,
|
|
);
|
|
|
|
let numOps = 0;
|
|
|
|
if (cmdName === "insert") {
|
|
assert(cmdObj.documents);
|
|
for (let doc of cmdObj.documents) {
|
|
bufferedOps.push(processInsertOp(nsInfoIdx, doc));
|
|
numOps += 1;
|
|
}
|
|
} else if (cmdName === "update") {
|
|
assert(cmdObj.updates);
|
|
for (let update of cmdObj.updates) {
|
|
bufferedOps.push(processUpdateOp(nsInfoIdx, cmdObj, update));
|
|
numOps += 1;
|
|
}
|
|
} else if (cmdName === "delete") {
|
|
assert(cmdObj.deletes);
|
|
for (let deleteCmd of cmdObj.deletes) {
|
|
bufferedOps.push(processDeleteOp(nsInfoIdx, cmdObj, deleteCmd));
|
|
numOps += 1;
|
|
}
|
|
} else {
|
|
throw new Error("Unrecognized command in bulkWrite override");
|
|
}
|
|
|
|
numOpsPerResponse.push(numOps);
|
|
}
|
|
|
|
return {
|
|
processCRUDOp: processCRUDOp,
|
|
getNsInfoIdx: getNsInfoIdx,
|
|
flushCurrentBulkWriteBatch: flushCurrentBulkWriteBatch,
|
|
resetBulkWriteBatch: resetBulkWriteBatch,
|
|
canProcessAsBulkWrite: canProcessAsBulkWrite,
|
|
getCurrentBatchSize: getCurrentBatchSize,
|
|
getBulkWriteState: getBulkWriteState,
|
|
getNamespaces: getNamespaces,
|
|
getBulkWriteCmd: getBulkWriteCmd,
|
|
};
|
|
})();
|