mirror of https://github.com/mongodb/mongo
161 lines
7.3 KiB
JavaScript
161 lines
7.3 KiB
JavaScript
/**
|
|
* Test that aggregation log lines include remoteOpWaitMillis: the amount of time the merger spent
|
|
* waiting for results from shards.
|
|
*
|
|
* @tags: [
|
|
* # $mergeCursors was added to explain output in 5.3.
|
|
* requires_fcv_53
|
|
* ]
|
|
*
|
|
*/
|
|
import {findMatchingLogLine, iterateMatchingLogLines} from "jstests/libs/log.js";
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
|
|
const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
|
|
st.stopBalancer();
|
|
|
|
const dbName = st.s.defaultDB;
|
|
const buildInfo = assert.commandWorked(st.s.getDB(dbName).adminCommand({"buildInfo": 1}));
|
|
const coll = st.s.getDB(dbName).getCollection("profile_remote_op_wait");
|
|
|
|
coll.drop();
|
|
assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
|
|
|
|
// Shard the test collection and split it into two chunks: one that contains all {shard: 1}
|
|
// documents and one that contains all {shard: 2} documents.
|
|
st.shardColl(
|
|
coll.getName(),
|
|
{shard: 1} /* shard key */,
|
|
{shard: 2} /* split at */,
|
|
{shard: 2} /* move the chunk containing {shard: 2} to its own shard */,
|
|
dbName,
|
|
true,
|
|
);
|
|
|
|
assert.commandWorked(coll.insert(Array.from({length: 100}, (_, i) => ({_id: i, shard: (i % 2) + 1, x: i}))));
|
|
|
|
// We want a pipeline that:
|
|
// 1. Requires a mergerPart. Otherwise, the entire query might get passed through to one shard, and
|
|
// we wouldn't spend time waiting for other nodes.
|
|
// 2. Is streaming. Otherwise, the merger would have to consume its entire input before returning
|
|
// the first batch, meaning subsequent getMores wouldn't do any waiting.
|
|
// A merge-sort stage should satisfy both of these requirements.
|
|
const pipeline = [{$sort: {x: 1}}];
|
|
const pipelineComment = "example_pipeline_should_have_remote_op_wait";
|
|
|
|
{
|
|
const explain = coll.explain().aggregate(pipeline);
|
|
assert(explain.shards, explain);
|
|
assert.eq(2, Object.keys(explain.shards).length, explain);
|
|
assert.eq(explain.splitPipeline.shardsPart, [{$sort: {sortKey: {x: 1}}}], explain);
|
|
// The mergerPart will only have a $mergeCursors stage that merge-sorts the results from each
|
|
// shard.
|
|
assert.eq(1, explain.splitPipeline.mergerPart.length, tojson(explain));
|
|
assert(explain.splitPipeline.mergerPart[0].hasOwnProperty("$mergeCursors"), tojson(explain));
|
|
assert.eq({x: 1}, explain.splitPipeline.mergerPart[0]["$mergeCursors"]["sort"]);
|
|
}
|
|
|
|
// Set the slow query logging threshold (slowMS) to -1 to ensure every query gets logged.
|
|
st.s.getDB("admin").setProfilingLevel(0, -1);
|
|
|
|
function getRemoteOpWait(logLine) {
|
|
const pattern = /remoteOpWaitMillis"?:([0-9]+)/;
|
|
const match = logLine.match(pattern);
|
|
assert(match, `pattern ${pattern} did not match line: ${logLine}`);
|
|
const millis = parseInt(match[1]);
|
|
assert.gte(millis, 0, match);
|
|
return millis;
|
|
}
|
|
|
|
function getWorkingMillis(logLine) {
|
|
const pattern = /workingMillis"?:([0-9]+)/;
|
|
const match = logLine.match(pattern);
|
|
assert(match, `pattern ${pattern} did not match line: ${logLine}`);
|
|
const millis = parseInt(match[1]);
|
|
assert.gte(millis, 0, match);
|
|
return millis;
|
|
}
|
|
|
|
// Run the pipeline and check mongos for the log line.
|
|
const cursor = coll.aggregate(pipeline, {comment: pipelineComment, batchSize: 1});
|
|
{
|
|
const mongosLog = assert.commandWorked(st.s.adminCommand({getLog: "global"}));
|
|
const line = findMatchingLogLine(mongosLog.log, {msg: "Slow query", comment: pipelineComment});
|
|
assert(line, "Failed to find a log line matching the comment");
|
|
const remoteOpWait = getRemoteOpWait(line);
|
|
const workingMillis = getWorkingMillis(line);
|
|
assert.lte(remoteOpWait, workingMillis);
|
|
}
|
|
|
|
// Run a getMore and check again for the log line: .next() empties the current 1-document batch, and
|
|
// .hasNext() fetches a new batch.
|
|
cursor.next();
|
|
cursor.hasNext();
|
|
{
|
|
const mongosLog = assert.commandWorked(st.s.adminCommand({getLog: "global"}));
|
|
const lines = [...iterateMatchingLogLines(mongosLog.log, {msg: "Slow query", comment: pipelineComment})];
|
|
const line = lines.find((line) => line.match(/command.{1,4}getMore/));
|
|
assert(line, "Failed to find a getMore log line matching the comment");
|
|
const remoteOpWait = getRemoteOpWait(line);
|
|
const workingMillis = getWorkingMillis(line);
|
|
assert.lte(remoteOpWait, workingMillis);
|
|
}
|
|
|
|
// A changestream is a type of aggregation, so it reports remoteOpWait. The initial $changeStream
|
|
// 'aggregate' command never pauses execution while awaiting data, and so we expect the remoteOpWait
|
|
// time to be less than or equal to the total execution duration.
|
|
const watchComment = "example_watch_should_have_remote_op_wait";
|
|
const csCursor = coll.watch([], {comment: watchComment});
|
|
{
|
|
const mongosLog = assert.commandWorked(st.s.adminCommand({getLog: "global"}));
|
|
const line = findMatchingLogLine(mongosLog.log, {msg: "Slow query", comment: watchComment});
|
|
assert(line, "Failed to find a log line matching the comment");
|
|
const remoteOpWait = getRemoteOpWait(line);
|
|
const workingMillis = getWorkingMillis(line);
|
|
assert.lte(remoteOpWait, workingMillis);
|
|
}
|
|
|
|
// A $changeStream getMore may pause execution while awaiting data if no results are currently
|
|
// available. In this case, it is possible for the total execution time to be less than the
|
|
// remoteOpWait time.
|
|
assert(!csCursor.hasNext());
|
|
{
|
|
const mongosLog = assert.commandWorked(st.s.adminCommand({getLog: "global"}));
|
|
const line = findMatchingLogLine(mongosLog.log, {msg: "Slow query", comment: watchComment, command: "getMore"});
|
|
assert(line, "Failed to find a log line matching the comment");
|
|
const remoteOpWait = getRemoteOpWait(line);
|
|
const workingMillis = getWorkingMillis(line);
|
|
assert.gte(remoteOpWait, 900);
|
|
// An upper bound for the execution time (workingMillis) is hard to determine, because it
|
|
// depends on the build configuration (debug vs. non-debug, sanitizers) and the overall CPU
|
|
// utilization of the system. Set the limit high enough so we don't get many false positives
|
|
// during testing.
|
|
const isSanitizerEnabled = buildInfo.buildEnvironment.ccflags.includes("-fsanitize");
|
|
assert.lte(workingMillis, isSanitizerEnabled ? 1000 : 30000);
|
|
}
|
|
|
|
// A query that merges on a shard logs remoteOpWaitMillis on the shard.
|
|
const pipeline2 = [{$sort: {x: 1}}, {$group: {_id: "$y"}}];
|
|
const pipelineComment2 = "example_pipeline2_should_have_remote_op_wait";
|
|
{
|
|
const explain2 = coll.explain().aggregate(pipeline2, {allowDiskUse: true});
|
|
assert.eq(explain2.mergeType, "anyShard", explain2);
|
|
}
|
|
st.shard0.getDB("admin").setProfilingLevel(0, -1);
|
|
st.shard1.getDB("admin").setProfilingLevel(0, -1);
|
|
coll.aggregate(pipeline2, {allowDiskUse: true, comment: pipelineComment2}).next();
|
|
{
|
|
const shard0Log = assert.commandWorked(st.shard0.adminCommand({getLog: "global"}));
|
|
const shard1Log = assert.commandWorked(st.shard1.adminCommand({getLog: "global"}));
|
|
const bothShardsLogLines = shard0Log.log.concat(shard1Log.log);
|
|
const lines = [...iterateMatchingLogLines(bothShardsLogLines, {msg: "Slow query", comment: pipelineComment2})];
|
|
// The line we want is whichever had a $mergeCursors stage.
|
|
const line = lines.find((line) => line.match(/mergeCursors/));
|
|
assert(line, `Failed to find a log line mentioning 'mergeCursors': ${lines}`);
|
|
const remoteOpWait = getRemoteOpWait(line);
|
|
const workingMillis = getWorkingMillis(line);
|
|
assert.lte(remoteOpWait, workingMillis);
|
|
}
|
|
|
|
st.stop();
|