mongo/jstests/sharding/query/log_remote_op_wait.js

161 lines
7.3 KiB
JavaScript

/**
* Test that aggregation log lines include remoteOpWaitMillis: the amount of time the merger spent
* waiting for results from shards.
*
* @tags: [
* # $mergeCursors was added to explain output in 5.3.
* requires_fcv_53
* ]
*
*/
import {findMatchingLogLine, iterateMatchingLogLines} from "jstests/libs/log.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
st.stopBalancer();
const dbName = st.s.defaultDB;
const buildInfo = assert.commandWorked(st.s.getDB(dbName).adminCommand({"buildInfo": 1}));
const coll = st.s.getDB(dbName).getCollection("profile_remote_op_wait");
coll.drop();
assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
// Shard the test collection and split it into two chunks: one that contains all {shard: 1}
// documents and one that contains all {shard: 2} documents.
st.shardColl(
coll.getName(),
{shard: 1} /* shard key */,
{shard: 2} /* split at */,
{shard: 2} /* move the chunk containing {shard: 2} to its own shard */,
dbName,
true,
);
assert.commandWorked(coll.insert(Array.from({length: 100}, (_, i) => ({_id: i, shard: (i % 2) + 1, x: i}))));
// We want a pipeline that:
// 1. Requires a mergerPart. Otherwise, the entire query might get passed through to one shard, and
// we wouldn't spend time waiting for other nodes.
// 2. Is streaming. Otherwise, the merger would have to consume its entire input before returning
// the first batch, meaning subsequent getMores wouldn't do any waiting.
// A merge-sort stage should satisfy both of these requirements.
const pipeline = [{$sort: {x: 1}}];
const pipelineComment = "example_pipeline_should_have_remote_op_wait";
{
const explain = coll.explain().aggregate(pipeline);
assert(explain.shards, explain);
assert.eq(2, Object.keys(explain.shards).length, explain);
assert.eq(explain.splitPipeline.shardsPart, [{$sort: {sortKey: {x: 1}}}], explain);
// The mergerPart will only have a $mergeCursors stage that merge-sorts the results from each
// shard.
assert.eq(1, explain.splitPipeline.mergerPart.length, tojson(explain));
assert(explain.splitPipeline.mergerPart[0].hasOwnProperty("$mergeCursors"), tojson(explain));
assert.eq({x: 1}, explain.splitPipeline.mergerPart[0]["$mergeCursors"]["sort"]);
}
// Set the slow query logging threshold (slowMS) to -1 to ensure every query gets logged.
st.s.getDB("admin").setProfilingLevel(0, -1);
function getRemoteOpWait(logLine) {
const pattern = /remoteOpWaitMillis"?:([0-9]+)/;
const match = logLine.match(pattern);
assert(match, `pattern ${pattern} did not match line: ${logLine}`);
const millis = parseInt(match[1]);
assert.gte(millis, 0, match);
return millis;
}
function getWorkingMillis(logLine) {
const pattern = /workingMillis"?:([0-9]+)/;
const match = logLine.match(pattern);
assert(match, `pattern ${pattern} did not match line: ${logLine}`);
const millis = parseInt(match[1]);
assert.gte(millis, 0, match);
return millis;
}
// Run the pipeline and check mongos for the log line.
const cursor = coll.aggregate(pipeline, {comment: pipelineComment, batchSize: 1});
{
const mongosLog = assert.commandWorked(st.s.adminCommand({getLog: "global"}));
const line = findMatchingLogLine(mongosLog.log, {msg: "Slow query", comment: pipelineComment});
assert(line, "Failed to find a log line matching the comment");
const remoteOpWait = getRemoteOpWait(line);
const workingMillis = getWorkingMillis(line);
assert.lte(remoteOpWait, workingMillis);
}
// Run a getMore and check again for the log line: .next() empties the current 1-document batch, and
// .hasNext() fetches a new batch.
cursor.next();
cursor.hasNext();
{
const mongosLog = assert.commandWorked(st.s.adminCommand({getLog: "global"}));
const lines = [...iterateMatchingLogLines(mongosLog.log, {msg: "Slow query", comment: pipelineComment})];
const line = lines.find((line) => line.match(/command.{1,4}getMore/));
assert(line, "Failed to find a getMore log line matching the comment");
const remoteOpWait = getRemoteOpWait(line);
const workingMillis = getWorkingMillis(line);
assert.lte(remoteOpWait, workingMillis);
}
// A changestream is a type of aggregation, so it reports remoteOpWait. The initial $changeStream
// 'aggregate' command never pauses execution while awaiting data, and so we expect the remoteOpWait
// time to be less than or equal to the total execution duration.
const watchComment = "example_watch_should_have_remote_op_wait";
const csCursor = coll.watch([], {comment: watchComment});
{
const mongosLog = assert.commandWorked(st.s.adminCommand({getLog: "global"}));
const line = findMatchingLogLine(mongosLog.log, {msg: "Slow query", comment: watchComment});
assert(line, "Failed to find a log line matching the comment");
const remoteOpWait = getRemoteOpWait(line);
const workingMillis = getWorkingMillis(line);
assert.lte(remoteOpWait, workingMillis);
}
// A $changeStream getMore may pause execution while awaiting data if no results are currently
// available. In this case, it is possible for the total execution time to be less than the
// remoteOpWait time.
assert(!csCursor.hasNext());
{
const mongosLog = assert.commandWorked(st.s.adminCommand({getLog: "global"}));
const line = findMatchingLogLine(mongosLog.log, {msg: "Slow query", comment: watchComment, command: "getMore"});
assert(line, "Failed to find a log line matching the comment");
const remoteOpWait = getRemoteOpWait(line);
const workingMillis = getWorkingMillis(line);
assert.gte(remoteOpWait, 900);
// An upper bound for the execution time (workingMillis) is hard to determine, because it
// depends on the build configuration (debug vs. non-debug, sanitizers) and the overall CPU
// utilization of the system. Set the limit high enough so we don't get many false positives
// during testing.
const isSanitizerEnabled = buildInfo.buildEnvironment.ccflags.includes("-fsanitize");
assert.lte(workingMillis, isSanitizerEnabled ? 1000 : 30000);
}
// A query that merges on a shard logs remoteOpWaitMillis on the shard.
const pipeline2 = [{$sort: {x: 1}}, {$group: {_id: "$y"}}];
const pipelineComment2 = "example_pipeline2_should_have_remote_op_wait";
{
const explain2 = coll.explain().aggregate(pipeline2, {allowDiskUse: true});
assert.eq(explain2.mergeType, "anyShard", explain2);
}
st.shard0.getDB("admin").setProfilingLevel(0, -1);
st.shard1.getDB("admin").setProfilingLevel(0, -1);
coll.aggregate(pipeline2, {allowDiskUse: true, comment: pipelineComment2}).next();
{
const shard0Log = assert.commandWorked(st.shard0.adminCommand({getLog: "global"}));
const shard1Log = assert.commandWorked(st.shard1.adminCommand({getLog: "global"}));
const bothShardsLogLines = shard0Log.log.concat(shard1Log.log);
const lines = [...iterateMatchingLogLines(bothShardsLogLines, {msg: "Slow query", comment: pipelineComment2})];
// The line we want is whichever had a $mergeCursors stage.
const line = lines.find((line) => line.match(/mergeCursors/));
assert(line, `Failed to find a log line mentioning 'mergeCursors': ${lines}`);
const remoteOpWait = getRemoteOpWait(line);
const workingMillis = getWorkingMillis(line);
assert.lte(remoteOpWait, workingMillis);
}
st.stop();