mirror of https://github.com/mongodb/mongo
SERVER-105541: Ensure diagnostic operations are correct for viewless timeseries (#42700)
GitOrigin-RevId: b2cd224f63df7002e2dc4f74c601aa6af295abce
This commit is contained in:
parent
758e21f673
commit
0f55732549
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
* Validates the profiler entry for a timeseries aggregation.
|
||||
*
|
||||
* @tags: [
|
||||
* requires_timeseries,
|
||||
* # The test runs commands that are not allowed with security token: setProfilingLevel.
|
||||
* not_allowed_with_signed_security_token,
|
||||
* does_not_support_stepdowns,
|
||||
* requires_fcv_83,
|
||||
* requires_profiling,
|
||||
* # The test runs getLatestProfileEntry(). The downstream syncing node affects the profiler.
|
||||
* run_getLatestProfilerEntry,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {getLatestProfilerEntry} from "jstests/libs/profiler.js";
|
||||
import {
|
||||
areViewlessTimeseriesEnabled,
|
||||
getTimeseriesBucketsColl,
|
||||
} from "jstests/core/timeseries/libs/viewless_timeseries_util.js";
|
||||
|
||||
const dbName = "test_db";
|
||||
const tsCollName = "test_ts_coll";
|
||||
const testDB = db.getSiblingDB(dbName);
|
||||
const tsColl = testDB.getCollection(tsCollName);
|
||||
|
||||
assert.commandWorked(
|
||||
testDB.setProfilingLevel(1, {filter: {"command.setFeatureCompatibilityVersion": {"$exists": false}}}),
|
||||
);
|
||||
|
||||
tsColl.drop();
|
||||
|
||||
assert.commandWorked(
|
||||
testDB.createCollection(tsCollName, {
|
||||
timeseries: {
|
||||
timeField: "t",
|
||||
metaField: "m",
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
// Insert mock time-series data into the collection.
|
||||
const now = new Date();
|
||||
assert.commandWorked(
|
||||
tsColl.insertMany([
|
||||
{t: new Date(now - 1000), m: "a", val: 1},
|
||||
{t: new Date(now), m: "a", val: 2},
|
||||
{t: new Date(now + 1000), m: "a", val: 3},
|
||||
]),
|
||||
);
|
||||
|
||||
// Comment on the timeseries aggregation, with a uuid to uniquely identify the command.
|
||||
const commentObj = {
|
||||
uuid: UUID().hex(),
|
||||
};
|
||||
|
||||
const results = tsColl.aggregate([{$match: {val: {$gt: 0}}}], {"comment": commentObj}).toArray();
|
||||
assert.eq(results.length, 3);
|
||||
|
||||
let profileObj = getLatestProfilerEntry(testDB);
|
||||
|
||||
// Validate profiler entry structure for timeseries aggregation.
|
||||
assert.eq(profileObj.op, "command");
|
||||
assert.eq(profileObj.ns, tsColl.getFullName());
|
||||
assert.eq(profileObj.command.aggregate, tsCollName);
|
||||
assert.eq(profileObj.command.comment, commentObj);
|
||||
assert.eq(profileObj.docsExamined, 2);
|
||||
|
||||
if (!areViewlessTimeseriesEnabled(db)) {
|
||||
// For view-ful timeseries, there is extra info about the resolved view.
|
||||
assert.eq(profileObj.resolvedViews.length, 1);
|
||||
let tsResolvedViewObj = profileObj.resolvedViews[0];
|
||||
|
||||
assert.eq(tsResolvedViewObj.viewNamespace, tsColl.getFullName());
|
||||
assert.eq(tsResolvedViewObj.dependencyChain, [tsCollName, getTimeseriesBucketsColl(tsCollName)]);
|
||||
}
|
||||
|
|
@ -1,14 +1,13 @@
|
|||
/**
|
||||
* This test verifies the correctness of the "collectionType" value in the slow query logs.
|
||||
* @tags: [
|
||||
* # TODO SERVER-107538 re-enable this test in viewless timeseries suites
|
||||
* featureFlagCreateViewlessTimeseriesCollections_incompatible,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {
|
||||
areViewlessTimeseriesEnabled,
|
||||
getTimeseriesBucketsColl,
|
||||
getTimeseriesCollForDDLOps,
|
||||
} from "jstests/core/timeseries/libs/viewless_timeseries_util.js";
|
||||
import {findMatchingLogLine} from "jstests/libs/log.js";
|
||||
import {getRawOperationSpec, getTimeseriesCollForRawOps} from "jstests/libs/raw_operation_utils.js";
|
||||
|
|
@ -74,12 +73,106 @@ import {getRawOperationSpec, getTimeseriesCollForRawOps} from "jstests/libs/raw_
|
|||
"timestamp": ISODate("2021-05-18T00:00:00.000Z"),
|
||||
}),
|
||||
);
|
||||
|
||||
getTimeseriesCollForRawOps(db, db.test.timeseries_coll_rawops).aggregate(pipeline, getRawOperationSpec(db));
|
||||
if (areViewlessTimeseriesEnabled(db)) {
|
||||
let tsCollNs = "test." + getTimeseriesCollForDDLOps(db, "timeseries_coll_rawops");
|
||||
checkLogForCollectionType(tsCollNs, "timeseriesBuckets");
|
||||
|
||||
// Check for view defined on a timeseries collection.
|
||||
assert.commandWorked(db.createView("viewOnTsColl", "timeseries_coll", [{$match: {a: 1}}]));
|
||||
ns = "test.viewOnTsColl";
|
||||
db.viewOnTsColl.aggregate(pipeline);
|
||||
checkLogForCollectionType(ns, "view");
|
||||
|
||||
// Checking queries with sub-pipelines (additional namespaces) don't interfere with returning
|
||||
// the correct main collection type.
|
||||
|
||||
// main namespace is "normal"
|
||||
db.coll.aggregate(
|
||||
pipeline.concat([
|
||||
{
|
||||
$unionWith: {
|
||||
coll: "viewOnColl",
|
||||
pipeline: pipeline,
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
checkLogForCollectionType("test.coll", "normal");
|
||||
|
||||
db.coll.aggregate(
|
||||
pipeline.concat([
|
||||
{
|
||||
$unionWith: {
|
||||
coll: "timeseries_coll",
|
||||
pipeline: pipeline,
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
checkLogForCollectionType("test.coll", "normal");
|
||||
|
||||
// main namespace is "timeseries"
|
||||
db.timeseries_coll.aggregate(
|
||||
pipeline.concat([
|
||||
{
|
||||
$unionWith: {
|
||||
coll: "coll",
|
||||
pipeline: pipeline,
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
checkLogForCollectionType("test.timeseries_coll", "timeseries");
|
||||
} else {
|
||||
checkLogForCollectionType("test." + getTimeseriesBucketsColl("timeseries_coll"), "timeseriesBuckets");
|
||||
}
|
||||
|
||||
db.timeseries_coll.aggregate(
|
||||
pipeline.concat([
|
||||
{
|
||||
$unionWith: {
|
||||
coll: "viewOnColl",
|
||||
pipeline: pipeline,
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
checkLogForCollectionType("test.timeseries_coll", "timeseries");
|
||||
|
||||
// main namespace is "view"
|
||||
db.viewOnColl.aggregate(
|
||||
pipeline.concat([
|
||||
{
|
||||
$unionWith: {
|
||||
coll: "coll",
|
||||
pipeline: pipeline,
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
checkLogForCollectionType("test.viewOnColl", "view");
|
||||
|
||||
db.viewOnColl.aggregate(
|
||||
pipeline.concat([
|
||||
{
|
||||
$unionWith: {
|
||||
coll: "viewOnColl",
|
||||
pipeline: pipeline,
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
checkLogForCollectionType("test.viewOnColl", "view");
|
||||
|
||||
db.viewOnColl.aggregate(
|
||||
pipeline.concat([
|
||||
{
|
||||
$unionWith: {
|
||||
coll: "timeseries_coll",
|
||||
pipeline: pipeline,
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
checkLogForCollectionType("test.viewOnColl", "view");
|
||||
|
||||
// Check for system collectionType.
|
||||
db.system.profile.aggregate(pipeline);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,115 @@
|
|||
/**
|
||||
* Test the output structure of $currentOp on a parallel aggregation query on a timeseries coll.
|
||||
*
|
||||
* @tags: [
|
||||
* requires_timeseries,
|
||||
* uses_parallel_shell,
|
||||
* requires_fcv_83,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {funWithArgs} from "jstests/libs/parallel_shell_helpers.js";
|
||||
import {getTimeseriesCollForDDLOps} from "jstests/core/timeseries/libs/viewless_timeseries_util.js";
|
||||
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
|
||||
const dbName = "test_db";
|
||||
const tsCollName = "test_ts_coll";
|
||||
|
||||
const replTest = new ReplSetTest({nodes: 2});
|
||||
replTest.startSet();
|
||||
replTest.initiate();
|
||||
|
||||
const primary = replTest.getPrimary();
|
||||
const testDB = primary.getDB(dbName);
|
||||
const adminDB = primary.getDB("admin");
|
||||
const tsColl = testDB.getCollection(tsCollName);
|
||||
|
||||
tsColl.drop();
|
||||
|
||||
assert.commandWorked(
|
||||
testDB.createCollection(tsCollName, {
|
||||
timeseries: {
|
||||
timeField: "t",
|
||||
metaField: "m",
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
// Insert mock time-series data into the collection.
|
||||
const now = new Date();
|
||||
assert.commandWorked(
|
||||
tsColl.insertMany([
|
||||
{t: new Date(now - 1000), m: "a", val: 1},
|
||||
{t: new Date(now), m: "a", val: 2},
|
||||
{t: new Date(now + 1000), m: "a", val: 3},
|
||||
]),
|
||||
);
|
||||
|
||||
// Enable fail point near the end of aggregation execution,
|
||||
// so the timeseries aggregation below will not be fully complete while running $currentOp.
|
||||
const kFailPointName = "hangBeforeDocumentSourceCursorLoadBatch";
|
||||
|
||||
// It is not stable to scope the fail point by namespace here because viewful and viewless
|
||||
// timeseries aggregations will execute on different namespaces.
|
||||
// The $currentOp aggregation will not hang on the same fail point because it runs against the
|
||||
// admin db connection.
|
||||
let fp = configureFailPoint(testDB, kFailPointName);
|
||||
|
||||
// Comment on the timeseries aggregation, with a uuid to uniquely identify the command.
|
||||
const commentObj = {
|
||||
uuid: UUID().hex(),
|
||||
};
|
||||
|
||||
// Run time-series aggregation command in parallel to $currentOp command.
|
||||
// Expect hangs at configured fail point before completion.
|
||||
const tsAggThread = startParallelShell(
|
||||
funWithArgs(
|
||||
function (dbName, tsCollName, commentObj) {
|
||||
const testDB = db.getSiblingDB(dbName);
|
||||
const tsColl = testDB[tsCollName];
|
||||
const results = tsColl.aggregate([{$match: {val: {$gt: 0}}}], {"comment": commentObj}).toArray();
|
||||
assert.eq(results.length, 3);
|
||||
},
|
||||
dbName,
|
||||
tsCollName,
|
||||
commentObj,
|
||||
),
|
||||
testDB.getMongo().port,
|
||||
);
|
||||
|
||||
fp.wait();
|
||||
|
||||
// Get result of $currentOp entry for the timeseries aggregation command.
|
||||
let results = [];
|
||||
assert.soon(() => {
|
||||
results = adminDB
|
||||
.aggregate([
|
||||
{$currentOp: {}},
|
||||
// Matching on the aggregation comment will uniquely identify the operation.
|
||||
// The command namespace is not reliable in this case as it changes between
|
||||
// view-ful and viewless timeseries aggregations.
|
||||
{$match: {"command.comment": commentObj}},
|
||||
])
|
||||
.toArray();
|
||||
|
||||
return results.length > 0;
|
||||
});
|
||||
|
||||
// Timeseries aggregation result found; validate its structure.
|
||||
(function validateCurOpResults() {
|
||||
// There should only be a single outstanding operation (the timeseries aggregation).
|
||||
assert.eq(results.length, 1);
|
||||
let tsAggCurOpResult = results[0];
|
||||
|
||||
// Now, validate its structure.
|
||||
assert.eq(tsAggCurOpResult.type, "op");
|
||||
assert.eq(tsAggCurOpResult.active, true);
|
||||
assert.eq(tsAggCurOpResult.op, "command");
|
||||
assert.eq(tsAggCurOpResult.ns, getTimeseriesCollForDDLOps(testDB, tsColl));
|
||||
assert.eq(tsAggCurOpResult.command.aggregate, tsCollName);
|
||||
})();
|
||||
|
||||
fp.off();
|
||||
tsAggThread();
|
||||
replTest.stopSet();
|
||||
|
|
@ -39,6 +39,7 @@
|
|||
#include "mongo/db/profile_filter.h"
|
||||
#include "mongo/db/query/plan_executor.h"
|
||||
#include "mongo/db/query/plan_summary_stats.h"
|
||||
#include "mongo/db/raw_data_operation.h"
|
||||
#include "mongo/db/repl/read_concern_args.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/rpc/metadata/client_metadata.h"
|
||||
|
|
@ -196,7 +197,7 @@ void OpDebug::report(OperationContext* opCtx,
|
|||
|
||||
pAttrs->add("isFromUserConnection", client && client->isFromUserConnection());
|
||||
pAttrs->addDeepCopy("ns", toStringForLogging(curop.getNSS()));
|
||||
pAttrs->addDeepCopy("collectionType", getCollectionType(curop.getNSS()));
|
||||
pAttrs->addDeepCopy("collectionType", getCollectionType(opCtx, curop.getNSS()));
|
||||
|
||||
if (client) {
|
||||
if (auto clientMetadata = ClientMetadata::get(client)) {
|
||||
|
|
@ -1361,33 +1362,48 @@ void OpDebug::appendResolvedViewsInfo(BSONObjBuilder& builder) const {
|
|||
resolvedViewsArr.doneFast();
|
||||
}
|
||||
|
||||
std::string OpDebug::getCollectionType(const NamespaceString& nss) const {
|
||||
std::string OpDebug::getCollectionType(OperationContext* opCtx, const NamespaceString& nss) const {
|
||||
if (nss.isEmpty()) {
|
||||
return "none";
|
||||
} else if (!resolvedViews.empty()) {
|
||||
}
|
||||
|
||||
if (!resolvedViews.empty()) {
|
||||
auto dependencyItr = resolvedViews.find(nss);
|
||||
// 'resolvedViews' might be populated if any other collection as a part of the query is on a
|
||||
// view. However, it will not have associated dependencies.
|
||||
if (dependencyItr == resolvedViews.end()) {
|
||||
return "normal";
|
||||
}
|
||||
if (dependencyItr != resolvedViews.end()) {
|
||||
const std::vector<NamespaceString>& dependencies = dependencyItr->second.first;
|
||||
|
||||
auto nssIterInDeps = std::find(dependencies.begin(), dependencies.end(), nss);
|
||||
tassert(7589000,
|
||||
str::stream() << "The view with ns: " << nss.toStringForErrorMsg()
|
||||
<< ", should have a valid dependency.",
|
||||
nssIterInDeps != (dependencies.end() - 1) && nssIterInDeps != dependencies.end());
|
||||
nssIterInDeps != (dependencies.end() - 1) &&
|
||||
nssIterInDeps != dependencies.end());
|
||||
|
||||
// The underlying namespace for the view/timeseries collection is the next namespace in the
|
||||
// dependency chain. If the view depends on a timeseries buckets collection, then it is a
|
||||
// timeseries collection, otherwise it is a regular view.
|
||||
// The underlying namespace for the view/timeseries collection is the next namespace in
|
||||
// the dependency chain. If the view depends on a timeseries buckets collection, then it
|
||||
// is a timeseries collection, otherwise it is a regular view.
|
||||
const NamespaceString& underlyingNss = *std::next(nssIterInDeps);
|
||||
if (underlyingNss.isTimeseriesBucketsCollection()) {
|
||||
return "timeseries";
|
||||
}
|
||||
return "view";
|
||||
} else if (nss.isTimeseriesBucketsCollection()) {
|
||||
}
|
||||
}
|
||||
|
||||
if (!knownTimeseriesNamespaces.empty()) {
|
||||
auto itr = knownTimeseriesNamespaces.find(nss);
|
||||
if (itr != knownTimeseriesNamespaces.end()) {
|
||||
if (isRawDataOperation(opCtx)) {
|
||||
return "timeseriesBuckets";
|
||||
} else {
|
||||
return "timeseries";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nss.isTimeseriesBucketsCollection()) {
|
||||
return "timeseriesBuckets";
|
||||
} else if (nss.isSystem()) {
|
||||
return "system";
|
||||
|
|
|
|||
|
|
@ -297,7 +297,8 @@ public:
|
|||
/**
|
||||
* Gets the type of the namespace on which the current operation operates.
|
||||
*/
|
||||
MONGO_MOD_PRIVATE std::string getCollectionType(const NamespaceString& nss) const;
|
||||
MONGO_MOD_PRIVATE std::string getCollectionType(OperationContext* opCtx,
|
||||
const NamespaceString& nss) const;
|
||||
|
||||
/**
|
||||
* Accumulate resolved views.
|
||||
|
|
@ -561,6 +562,11 @@ public:
|
|||
std::map<NamespaceString, std::pair<std::vector<NamespaceString>, std::vector<BSONObj>>>
|
||||
resolvedViews;
|
||||
|
||||
// Holds which namespaces seen across the operation are known to be timeseries.
|
||||
// TODO SERVER-113634: Add timeseries namespace to all operations that apply,
|
||||
// not just read commands.
|
||||
std::set<NamespaceString> knownTimeseriesNamespaces;
|
||||
|
||||
// Stores metrics handles for extensions to properly manage their lifetimes. The contents of
|
||||
// these stats are opaque to the MongoDB host - this object allows extensions to implement their
|
||||
// own custom aggregation and serialization logic.
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@
|
|||
|
||||
#include "mongo/db/query/timeseries/timeseries_translation.h"
|
||||
|
||||
#include "mongo/db/curop.h"
|
||||
#include "mongo/db/pipeline/document_source_index_stats.h"
|
||||
#include "mongo/db/pipeline/document_source_internal_convert_bucket_index_stats.h"
|
||||
#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
|
||||
|
|
@ -157,8 +158,22 @@ void translatePipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
|||
}
|
||||
|
||||
boost::optional<TimeseriesTranslationParams> getTimeseriesTranslationParamsIfRequired(
|
||||
OperationContext* opCtx, const CollectionRoutingInfo& cri) {
|
||||
if (!requiresViewlessTimeseriesTranslationInRouter(opCtx, cri)) {
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx, const CollectionRoutingInfo& cri) {
|
||||
|
||||
const bool isViewlessTimeseriesColl =
|
||||
cri.hasRoutingTable() && isViewlessTimeseriesCollection(cri.getChunkManager());
|
||||
|
||||
// Regardless if we perform the timeseries translation, if the namespace is a viewless
|
||||
// timeseries, we need to add it into the CurOp::knownTimeseriesNamespace map so $currentOp can
|
||||
// return the correct coll type.
|
||||
if (isViewlessTimeseriesColl) {
|
||||
CurOp::get(expCtx->getOperationContext())
|
||||
->debug()
|
||||
.knownTimeseriesNamespaces.insert(expCtx->getUserNss());
|
||||
}
|
||||
|
||||
if (isRawDataOperation(expCtx->getOperationContext()) || !isViewlessTimeseriesColl) {
|
||||
// No translation necessary in these cases.
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
|
|
@ -178,13 +193,27 @@ boost::optional<TimeseriesTranslationParams> getTimeseriesTranslationParamsIfReq
|
|||
}
|
||||
|
||||
boost::optional<TimeseriesTranslationParams> getTimeseriesTranslationParamsIfRequired(
|
||||
OperationContext* opCtx, const CollectionOrViewAcquisition& collOrView) {
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
||||
const CollectionOrViewAcquisition& collOrView) {
|
||||
if (!collOrView.isCollection()) {
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
const CollectionPtr& collPtr = collOrView.getCollectionPtr();
|
||||
if (!requiresViewlessTimeseriesTranslation(opCtx, collPtr)) {
|
||||
|
||||
const bool isViewlessTimeseriesColl = collPtr && isViewlessTimeseriesCollection(*collPtr.get());
|
||||
|
||||
// Regardless if we perform the timeseries translation, if the namespace is a viewless
|
||||
// timeseries, we need to add it into the CurOp::knownTimeseriesNamespace map so $currentOp can
|
||||
// return the correct coll type.
|
||||
if (isViewlessTimeseriesColl) {
|
||||
CurOp::get(expCtx->getOperationContext())
|
||||
->debug()
|
||||
.knownTimeseriesNamespaces.insert(expCtx->getUserNss());
|
||||
}
|
||||
|
||||
if (isRawDataOperation(expCtx->getOperationContext()) || !isViewlessTimeseriesColl) {
|
||||
// No translation necessary in these cases.
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
|
|
@ -207,7 +236,7 @@ void translateStagesIfRequiredImpl(const boost::intrusive_ptr<ExpressionContext>
|
|||
}
|
||||
|
||||
const boost::optional<TimeseriesTranslationParams> params =
|
||||
getTimeseriesTranslationParamsIfRequired(expCtx->getOperationContext(), catalogData);
|
||||
getTimeseriesTranslationParamsIfRequired(expCtx, catalogData);
|
||||
if (!params) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -229,7 +258,7 @@ void translateIndexHintIfRequiredImpl(const boost::intrusive_ptr<ExpressionConte
|
|||
}
|
||||
|
||||
const boost::optional<TimeseriesTranslationParams> params =
|
||||
getTimeseriesTranslationParamsIfRequired(expCtx->getOperationContext(), catalogData);
|
||||
getTimeseriesTranslationParamsIfRequired(expCtx, catalogData);
|
||||
if (!params) {
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue