mirror of https://github.com/mongodb/mongo
193 lines
5.2 KiB
JavaScript
193 lines
5.2 KiB
JavaScript
// Test killop applied to m/r operations and child ops of m/r operations.
|
|
// Cannot implicitly shard accessed collections because the "command" field in the currentOp()
|
|
// output is reported as {"mapreduce.shardedfinish": { mapreduce: "jstests_mr_killop", ... }, ... }
|
|
// when the "finalize" option to the "mapReduce" command is used on a sharded collection.
|
|
// @tags: [
|
|
// assumes_unsharded_collection,
|
|
// # mapReduce does not support afterClusterTime.
|
|
// does_not_support_causal_consistency,
|
|
// does_not_support_stepdowns,
|
|
// uses_map_reduce_with_temp_collections,
|
|
// uses_multiple_connections,
|
|
// uses_parallel_shell,
|
|
// ]
|
|
(function() {
|
|
"use strict";
|
|
const source = db.jstests_mr_killop;
|
|
source.drop();
|
|
const out = db.jstests_mr_killop_out;
|
|
out.drop();
|
|
assert.commandWorked(db.adminCommand({configureFailPoint: "mr_killop_test_fp", mode: "alwaysOn"}));
|
|
|
|
/** @return op code for map reduce op created by spawned shell. */
|
|
function getOpCode() {
|
|
const inProg = db.currentOp().inprog;
|
|
|
|
function isMapReduce(op) {
|
|
if (!op.command) {
|
|
return false;
|
|
}
|
|
|
|
const cmdBody = op.command;
|
|
if (cmdBody.$truncated) {
|
|
const stringifiedCmd = cmdBody.$truncated;
|
|
return (stringifiedCmd.search('mapreduce') >= 0 ||
|
|
stringifiedCmd.search('aggregate') >= 0) &&
|
|
stringifiedCmd.search(source.getName()) >= 0;
|
|
}
|
|
|
|
return cmdBody.mapreduce && cmdBody.mapreduce == source.getName();
|
|
}
|
|
|
|
for (let i in inProg) {
|
|
const o = inProg[i];
|
|
// Identify a map/reduce operation by its collection, whether or not it is currently active.
|
|
if ((o.active || o.waitingForLock) && isMapReduce(o))
|
|
return o.opid;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
/**
|
|
* Run one mapReduce with the specified parameters in a parallel shell. Kill the map reduce op and
|
|
* wait for the map reduce op to terminate.
|
|
*/
|
|
function runTest(map, reduce, finalize, scope, wait) {
|
|
source.drop();
|
|
out.drop();
|
|
// Ensure we have 2 documents for the reduce to run.
|
|
assert.commandWorked(source.insert({a: 1}));
|
|
assert.commandWorked(source.insert({a: 1}));
|
|
|
|
const spec = {mapreduce: source.getName(), out: out.getName(), map: map, reduce: reduce};
|
|
if (finalize) {
|
|
spec["finalize"] = finalize;
|
|
}
|
|
if (scope) {
|
|
spec["scope"] = scope;
|
|
}
|
|
|
|
// Windows shell strips all double quotes from command line, so use single quotes.
|
|
const stringifiedSpec = tojson(spec).toString().replace(/\n/g, ' ').replace(/\"/g, "\'");
|
|
|
|
// The assert below won't be caught by this test script, but it will cause error messages to be
|
|
// printed.
|
|
const awaitShell =
|
|
startParallelShell("assert.commandWorked( db.runCommand( " + stringifiedSpec + " ) );");
|
|
|
|
if (wait) {
|
|
sleep(2000);
|
|
}
|
|
|
|
let opCode = null;
|
|
assert.soon(function() {
|
|
opCode = getOpCode();
|
|
return opCode != -1;
|
|
});
|
|
|
|
db.killOp(opCode);
|
|
|
|
// When the map reduce op is killed, the spawned shell will exit
|
|
const exitCode = awaitShell({checkExitSuccess: false});
|
|
assert.neq(0,
|
|
exitCode,
|
|
"expected shell to exit abnormally due to map-reduce execution being terminated");
|
|
assert.eq(-1, getOpCode());
|
|
}
|
|
|
|
/** Test using wait and non wait modes. */
|
|
function runTests(map, reduce, finalize, scope) {
|
|
runTest(map, reduce, finalize, scope, false);
|
|
runTest(map, reduce, finalize, scope, true);
|
|
}
|
|
|
|
/** Test looping in map function. */
|
|
function runMapTests(loop) {
|
|
// Without scope.
|
|
runTests(
|
|
loop, // map
|
|
function(k, v) {
|
|
return v[0];
|
|
}, // reduce
|
|
null, // finalize
|
|
null // scope
|
|
);
|
|
|
|
// With scope.
|
|
runTests(
|
|
function() {
|
|
loop();
|
|
}, // map
|
|
function(k, v) {
|
|
return v[0];
|
|
}, // reduce
|
|
null, // finalize
|
|
{loop: loop} // scope
|
|
);
|
|
}
|
|
|
|
/** Test looping in reduce function. */
|
|
function runReduceTests(loop) {
|
|
// Without scope.
|
|
runTests(
|
|
function() {
|
|
emit(this.a, 1);
|
|
}, // map
|
|
loop, // reduce
|
|
null, // finalize
|
|
null // scope
|
|
);
|
|
|
|
// With scope.
|
|
runTests(
|
|
function() {
|
|
emit(this.a, 1);
|
|
}, // map
|
|
function() {
|
|
loop();
|
|
}, // reduce
|
|
null, // finalize
|
|
{loop: loop} // scope
|
|
);
|
|
}
|
|
|
|
/** Test looping in finalize function. */
|
|
function runFinalizeTests(loop) {
|
|
// Without scope.
|
|
runTests(
|
|
function() {
|
|
emit(this.a, 1);
|
|
}, // map
|
|
function(k, v) {
|
|
return v[0];
|
|
}, // reduce
|
|
loop, // finalize
|
|
null // scope
|
|
);
|
|
|
|
// With scope.
|
|
runTests(
|
|
function() {
|
|
emit(this.a, 1);
|
|
}, // map
|
|
function(k, v) {
|
|
return v[0];
|
|
}, // reduce
|
|
function(a, b) {
|
|
loop();
|
|
}, // finalize
|
|
{loop: loop} // scope
|
|
);
|
|
}
|
|
|
|
const loop = function() {
|
|
while (1) {
|
|
sleep(1000);
|
|
}
|
|
};
|
|
runMapTests(loop, false);
|
|
runReduceTests(loop, false);
|
|
runFinalizeTests(loop, false);
|
|
db.adminCommand({configureFailPoint: "mr_killop_test_fp", mode: "off"});
|
|
}());
|