mirror of https://github.com/mongodb/mongo
SERVER-114830 Implement shard catalog viewless timeseries upgrade/downgrade (#44786)
GitOrigin-RevId: d3577059c77f5f8955d9df9f85f284c6c353112d
This commit is contained in:
parent
dbc6666d27
commit
c70665401f
|
|
@ -2922,6 +2922,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
|
|||
# The following patterns are parsed from ./src/mongo/db/timeseries/OWNERS.yml
|
||||
/src/mongo/db/timeseries/**/* @10gen/server-collection-write-path @svc-auto-approve-bot
|
||||
/src/mongo/db/timeseries/**/*catalog_helper* @10gen/server-catalog-and-routing-shard-catalog @svc-auto-approve-bot
|
||||
/src/mongo/db/timeseries/**/upgrade_downgrade_viewless_timeseries* @10gen/server-catalog-and-routing-shard-catalog @svc-auto-approve-bot
|
||||
|
||||
# The following patterns are parsed from ./src/mongo/db/timeseries/bucket_catalog/OWNERS.yml
|
||||
/src/mongo/db/timeseries/bucket_catalog/**/* @10gen/server-timeseries-bucket-catalog @svc-auto-approve-bot
|
||||
|
|
|
|||
|
|
@ -69,6 +69,7 @@
|
|||
#include "mongo/db/sharding_environment/sharding_feature_flags_gen.h"
|
||||
#include "mongo/db/storage/snapshot.h"
|
||||
#include "mongo/db/timeseries/timeseries_options.h"
|
||||
#include "mongo/db/timeseries/upgrade_downgrade_viewless_timeseries.h"
|
||||
#include "mongo/db/topology/sharding_state.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/s/query/exec/cluster_cursor_manager.h"
|
||||
|
|
@ -165,83 +166,17 @@ void _checkBucketCollectionInconsistencies(
|
|||
return;
|
||||
}
|
||||
|
||||
const std::string errMsgPrefix = str::stream()
|
||||
<< nss.toStringForErrorMsg() << " is a bucket collection but is missing";
|
||||
|
||||
// A bucket collection must always have timeseries options
|
||||
const bool hasTimeseriesOptions = localColl->isTimeseriesCollection();
|
||||
if (!hasTimeseriesOptions) {
|
||||
const std::string errMsg = str::stream() << errMsgPrefix << " the timeseries options";
|
||||
const BSONObj options = localColl->getCollectionOptions().toBSON();
|
||||
inconsistencies.emplace_back(
|
||||
makeInconsistency(MetadataInconsistencyTypeEnum::kMalformedTimeseriesBucketsCollection,
|
||||
for (auto& inconsistency : timeseries::checkBucketCollectionInconsistencies(
|
||||
opCtx,
|
||||
localColl,
|
||||
checkView,
|
||||
localCatalogSnapshot->lookupView(opCtx, nss.getTimeseriesViewNamespace()).get(),
|
||||
localCatalogSnapshot->lookupCollectionByNamespace(opCtx,
|
||||
nss.getTimeseriesViewNamespace()))) {
|
||||
inconsistencies.emplace_back(makeInconsistency(
|
||||
MetadataInconsistencyTypeEnum::kMalformedTimeseriesBucketsCollection,
|
||||
MalformedTimeseriesBucketsCollectionDetails{
|
||||
nss, std::move(errMsg), std::move(options)}));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!checkView) {
|
||||
return;
|
||||
}
|
||||
|
||||
// A bucket collection on the primary shard must always be backed by a view in the proper
|
||||
// format. Check if there is a valid view, otherwise return current view/collection options (if
|
||||
// present).
|
||||
const auto [hasValidView, invalidOptions] = [&] {
|
||||
if (const auto& view =
|
||||
localCatalogSnapshot->lookupView(opCtx, nss.getTimeseriesViewNamespace())) {
|
||||
if (view->viewOn() == nss && view->pipeline().size() == 1) {
|
||||
const auto expectedViewPipeline = timeseries::generateViewPipeline(
|
||||
*localColl->getTimeseriesOptions(), false /* asArray */);
|
||||
const auto expectedInternalUnpackStage =
|
||||
expectedViewPipeline
|
||||
.getField(DocumentSourceInternalUnpackBucket::kStageNameInternal)
|
||||
.Obj();
|
||||
const auto actualPipeline = view->pipeline().front();
|
||||
if (actualPipeline.hasField(
|
||||
DocumentSourceInternalUnpackBucket::kStageNameInternal)) {
|
||||
const auto actualInternalUnpackStage =
|
||||
actualPipeline
|
||||
.getField(DocumentSourceInternalUnpackBucket::kStageNameInternal)
|
||||
.Obj()
|
||||
// Ignore `exclude` field introduced in v5.0 and removed in v5.1
|
||||
.removeField(DocumentSourceInternalUnpackBucket::kExclude);
|
||||
if (actualInternalUnpackStage.woCompare(expectedInternalUnpackStage) == 0) {
|
||||
// The view is in the expected format
|
||||
return std::make_pair(true, BSONObj());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The view is not in the expected format, return the current options for debugging
|
||||
BSONArrayBuilder pipelineArray;
|
||||
const auto& pipeline = view->pipeline();
|
||||
for (const auto& stage : pipeline) {
|
||||
pipelineArray.append(stage);
|
||||
}
|
||||
|
||||
const BSONObj currentViewOptions = BSON("viewOn" << toStringForLogging(view->viewOn())
|
||||
<< "pipeline" << pipelineArray.arr());
|
||||
|
||||
return std::make_pair(false, currentViewOptions);
|
||||
}
|
||||
|
||||
const auto& coll = localCatalogSnapshot->lookupCollectionByNamespace(
|
||||
opCtx, nss.getTimeseriesViewNamespace());
|
||||
if (coll) {
|
||||
// A collection is present rather than a view, return the current options for debugging
|
||||
return std::make_pair(false, coll->getCollectionOptions().toBSON());
|
||||
}
|
||||
|
||||
return std::make_pair(false, BSONObj());
|
||||
}();
|
||||
|
||||
if (!hasValidView) {
|
||||
const std::string errMsg = str::stream() << errMsgPrefix << " a valid view backing it";
|
||||
inconsistencies.emplace_back(
|
||||
makeInconsistency(MetadataInconsistencyTypeEnum::kMalformedTimeseriesBucketsCollection,
|
||||
MalformedTimeseriesBucketsCollectionDetails{
|
||||
nss, std::move(errMsg), std::move(invalidOptions)}));
|
||||
nss, std::move(inconsistency.issue), std::move(inconsistency.options)}));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -483,6 +483,7 @@ mongo_cc_library(
|
|||
"//src/mongo/db/sharding_environment:sharding_initialization",
|
||||
"//src/mongo/db/sharding_environment:sharding_logging",
|
||||
"//src/mongo/db/sharding_environment:sharding_runtime_d_params_idl",
|
||||
"//src/mongo/db/timeseries:upgrade_downgrade_viewless_timeseries",
|
||||
"//src/mongo/db/timeseries/bucket_catalog",
|
||||
"//src/mongo/db/topology/vector_clock:vector_clock_mongod",
|
||||
"//src/mongo/db/transaction",
|
||||
|
|
|
|||
|
|
@ -213,6 +213,21 @@ mongo_cc_library(
|
|||
],
|
||||
)
|
||||
|
||||
# TODO(SERVER-114573): Remove this target once 9.0 becomes lastLTS.
|
||||
mongo_cc_library(
|
||||
name = "upgrade_downgrade_viewless_timeseries",
|
||||
srcs = [
|
||||
"upgrade_downgrade_viewless_timeseries.cpp",
|
||||
],
|
||||
deps = [
|
||||
":timeseries_options",
|
||||
":viewless_timeseries_collection_creation_helpers",
|
||||
"//src/mongo/db:server_base",
|
||||
"//src/mongo/db/shard_role",
|
||||
"//src/mongo/db/shard_role/shard_catalog:collection_catalog",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_unit_test(
|
||||
name = "db_timeseries_test",
|
||||
srcs = [
|
||||
|
|
@ -226,6 +241,7 @@ mongo_cc_unit_test(
|
|||
"timeseries_update_delete_util_test.cpp",
|
||||
"timeseries_write_commands_test.cpp",
|
||||
"timeseries_write_util_test.cpp",
|
||||
"upgrade_downgrade_viewless_timeseries_test.cpp",
|
||||
],
|
||||
tags = [
|
||||
"mongo_unittest_sixth_group",
|
||||
|
|
@ -238,6 +254,7 @@ mongo_cc_unit_test(
|
|||
":timeseries_extended_range",
|
||||
":timeseries_options",
|
||||
":timeseries_write_util",
|
||||
":upgrade_downgrade_viewless_timeseries",
|
||||
"//src/mongo/db:record_id_helpers",
|
||||
"//src/mongo/db/collection_crud",
|
||||
"//src/mongo/db/shard_role",
|
||||
|
|
|
|||
|
|
@ -6,3 +6,6 @@ filters:
|
|||
- "*catalog_helper*":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing-shard-catalog
|
||||
- "upgrade_downgrade_viewless_timeseries*":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing-shard-catalog
|
||||
|
|
|
|||
|
|
@ -0,0 +1,345 @@
|
|||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/timeseries/upgrade_downgrade_viewless_timeseries.h"
|
||||
|
||||
#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
|
||||
#include "mongo/db/server_feature_flags_gen.h"
|
||||
#include "mongo/db/shard_role/lock_manager/d_concurrency.h"
|
||||
#include "mongo/db/shard_role/lock_manager/exception_util.h"
|
||||
#include "mongo/db/shard_role/lock_manager/locker.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/catalog_raii.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_catalog.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_options.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/database.h"
|
||||
#include "mongo/db/storage/write_unit_of_work.h"
|
||||
#include "mongo/db/timeseries/timeseries_constants.h"
|
||||
#include "mongo/db/timeseries/timeseries_options.h"
|
||||
#include "mongo/db/timeseries/viewless_timeseries_collection_creation_helpers.h"
|
||||
#include "mongo/util/uuid.h"
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
|
||||
|
||||
namespace mongo {
|
||||
|
||||
namespace timeseries {
|
||||
|
||||
// Maximum time to acquire the locks over all affected entities. We do this since:
|
||||
// - Upgrade/downgrade is a low priority task, so guard against it holding locks for too long.
|
||||
// - Ensure that by locking all affected namespaces (buckets NS + main NS + system.views) we do not
|
||||
// cause a deadlock.
|
||||
static const auto LOCK_TIMEOUT = Seconds(30);
|
||||
|
||||
void upgradeToViewlessTimeseries(OperationContext* opCtx,
|
||||
const NamespaceString& mainNs,
|
||||
const boost::optional<UUID>& expectedUUID) {
|
||||
LOGV2(11483000, "Started upgrade to viewless timeseries", logAttrs(mainNs));
|
||||
|
||||
tassert(11483001,
|
||||
"Expected 'mainNs' for timeseries upgrade to not be a system.buckets namespace",
|
||||
!mainNs.isTimeseriesBucketsCollection());
|
||||
auto bucketsNs = mainNs.makeTimeseriesBucketsNamespace();
|
||||
|
||||
VersionContext::FixedOperationFCVRegion fixedOfcvRegion(opCtx);
|
||||
tassert(11483002,
|
||||
"Tried to upgrade to viewless timeseries without the feature flag enabled",
|
||||
gFeatureFlagCreateViewlessTimeseriesCollections.isEnabled(
|
||||
VersionContext::getDecoration(opCtx)));
|
||||
|
||||
writeConflictRetry(opCtx, "viewlessTimeseriesUpgrade", mainNs, [&] {
|
||||
// Acquire locks over the affected namespaces. The buckets collection must be locked first.
|
||||
auto lockDeadline = Date_t::now() + LOCK_TIMEOUT;
|
||||
AutoGetDb autoDb(opCtx, mainNs.dbName(), MODE_IX, lockDeadline);
|
||||
AutoGetCollection bucketsColl(
|
||||
opCtx, bucketsNs, MODE_X, auto_get_collection::Options{}.deadline(lockDeadline));
|
||||
AutoGetCollection mainColl(opCtx,
|
||||
mainNs,
|
||||
MODE_X,
|
||||
auto_get_collection::Options{}
|
||||
.viewMode(auto_get_collection::ViewMode::kViewsPermitted)
|
||||
.deadline(lockDeadline));
|
||||
|
||||
// Idempotency check
|
||||
if (mainColl && mainColl->isTimeseriesCollection() &&
|
||||
mainColl->isNewTimeseriesWithoutView() && !bucketsColl) {
|
||||
// TODO(SERVER-114517): Investigate if we should relax this check.
|
||||
tassert(11483003,
|
||||
"Found an already upgraded timeseries collection but with an unexpected UUID",
|
||||
!expectedUUID || mainColl->uuid() == *expectedUUID);
|
||||
return;
|
||||
}
|
||||
|
||||
// Sanity checks
|
||||
tassert(11483004,
|
||||
"Did not find the buckets collection to upgrade to viewless timeseries",
|
||||
bucketsColl);
|
||||
// TODO(SERVER-114517): Investigate if we should relax this check.
|
||||
tassert(11483005,
|
||||
"The buckets collection to upgrade does not have the expected UUID",
|
||||
!expectedUUID || bucketsColl->uuid() == *expectedUUID);
|
||||
tassert(11483006,
|
||||
"The buckets collection to upgrade does not have valid timeseries options",
|
||||
bucketsColl->isTimeseriesCollection() &&
|
||||
!bucketsColl->isNewTimeseriesWithoutView());
|
||||
|
||||
auto inconsistencies = checkBucketCollectionInconsistencies(opCtx,
|
||||
*bucketsColl,
|
||||
false /* ensureViewExists */,
|
||||
mainColl.getView(),
|
||||
(*mainColl).get());
|
||||
if (!inconsistencies.empty()) {
|
||||
for (const auto& inconsistency : inconsistencies) {
|
||||
LOGV2(
|
||||
11483007,
|
||||
"Skipping timeseries upgrade because we found a buckets metadata inconsistency",
|
||||
logAttrs(mainNs),
|
||||
"issue"_attr = inconsistency.issue,
|
||||
"options"_attr = inconsistency.options);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Operations all lock system.views in the end to prevent deadlock.
|
||||
Lock::CollectionLock systemViewsLock(
|
||||
opCtx, autoDb.getDb()->getSystemViewsName(), MODE_X, lockDeadline);
|
||||
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
|
||||
// Run the timeseries upgrade steps without generating oplog entries.
|
||||
{
|
||||
repl::UnreplicatedWritesBlock uwb(opCtx);
|
||||
|
||||
// Drop view and rename the buckets NSS over it.
|
||||
uassertStatusOK(autoDb.getDb()->dropView(opCtx, mainNs));
|
||||
uassertStatusOK(
|
||||
autoDb.getDb()->renameCollection(opCtx, bucketsNs, mainNs, true /* stayTemp */));
|
||||
|
||||
// Clean up the buckets NSS metadata.
|
||||
CollectionWriter collWriter{opCtx, mainNs};
|
||||
Collection* writableColl = collWriter.getWritableCollection(opCtx);
|
||||
uassertStatusOK(writableColl->updateValidator(opCtx,
|
||||
BSONObj() /* newValidator */,
|
||||
boost::none /* validationLevel */,
|
||||
boost::none /* validatorAction */));
|
||||
}
|
||||
|
||||
// Log a oplog entry giving a single, atomic timestamp to all operations done above.
|
||||
// TODO(SERVER-114505): Call the OpObserver to emit an oplog entry
|
||||
|
||||
wuow.commit();
|
||||
});
|
||||
|
||||
LOGV2(11483008, "Finished upgrade to viewless timeseries", logAttrs(mainNs));
|
||||
}
|
||||
|
||||
void downgradeFromViewlessTimeseries(OperationContext* opCtx,
|
||||
const NamespaceString& mainNs,
|
||||
const boost::optional<UUID>& expectedUUID) {
|
||||
LOGV2(11483009, "Started downgrade of timeseries collection format", logAttrs(mainNs));
|
||||
|
||||
tassert(11483010,
|
||||
"Expected 'mainNs' for timeseries downgrade to not be a system.buckets namespace",
|
||||
!mainNs.isTimeseriesBucketsCollection());
|
||||
auto bucketsNs = mainNs.makeTimeseriesBucketsNamespace();
|
||||
|
||||
VersionContext::FixedOperationFCVRegion fixedOfcvRegion(opCtx);
|
||||
tassert(11483011,
|
||||
"Tried to downgrade from viewless timeseries with the feature flag enabled",
|
||||
!gFeatureFlagCreateViewlessTimeseriesCollections.isEnabled(
|
||||
VersionContext::getDecoration(opCtx)));
|
||||
|
||||
writeConflictRetry(opCtx, "viewlessTimeseriesDowngrade", mainNs, [&] {
|
||||
// Acquire locks over the affected namespaces. The buckets collection must be locked first.
|
||||
auto lockDeadline = Date_t::now() + LOCK_TIMEOUT;
|
||||
AutoGetDb autoDb(opCtx, mainNs.dbName(), MODE_IX, lockDeadline);
|
||||
AutoGetCollection bucketsColl(
|
||||
opCtx, bucketsNs, MODE_X, auto_get_collection::Options{}.deadline(lockDeadline));
|
||||
AutoGetCollection mainColl(opCtx,
|
||||
mainNs,
|
||||
MODE_X,
|
||||
auto_get_collection::Options{}
|
||||
.viewMode(auto_get_collection::ViewMode::kViewsPermitted)
|
||||
.deadline(lockDeadline));
|
||||
|
||||
// Idempotency check
|
||||
if (bucketsColl && bucketsColl->isTimeseriesCollection() &&
|
||||
!bucketsColl->isNewTimeseriesWithoutView() && !mainColl && mainColl.getView() &&
|
||||
mainColl.getView()->timeseries()) {
|
||||
// TODO(SERVER-114517): Investigate if we should relax this check.
|
||||
tassert(11483012,
|
||||
"Found an already downgraded timeseries collection but with an unexpected UUID",
|
||||
!expectedUUID || bucketsColl->uuid() == *expectedUUID);
|
||||
return;
|
||||
}
|
||||
|
||||
// Sanity checks
|
||||
tassert(11483013, "Did not find the viewless timeseries collection to downgrade", mainColl);
|
||||
// TODO(SERVER-114517): Investigate if we should relax this check.
|
||||
tassert(11483014,
|
||||
"The viewless collection to downgrade does not have the expected UUID",
|
||||
!expectedUUID || mainColl->uuid() == *expectedUUID);
|
||||
tassert(11483015,
|
||||
"The viewless collection to downgrade does not have timeseries options",
|
||||
mainColl->isTimeseriesCollection() && mainColl->isNewTimeseriesWithoutView());
|
||||
tassert(11483016,
|
||||
"While downgrading viewless timeseries, we found a conflicting buckets collection",
|
||||
!bucketsColl);
|
||||
|
||||
// Operations all lock system.views in the end to prevent deadlock.
|
||||
Lock::CollectionLock systemViewsLock(
|
||||
opCtx, autoDb.getDb()->getSystemViewsName(), MODE_X, lockDeadline);
|
||||
// Create system.views if it does not exist. This is done in a separate WUOW.
|
||||
autoDb.getDb()->createSystemDotViewsIfNecessary(opCtx);
|
||||
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
|
||||
// Run the timeseries downgrade steps without generating oplog entries.
|
||||
{
|
||||
repl::UnreplicatedWritesBlock uwb(opCtx);
|
||||
|
||||
// Rename the collection to the buckets NSS and create the view on the main NSS.
|
||||
uassertStatusOK(
|
||||
autoDb.getDb()->renameCollection(opCtx, mainNs, bucketsNs, true /* stayTemp */));
|
||||
|
||||
CollectionOptions viewOptions;
|
||||
viewOptions.viewOn = std::string{bucketsNs.coll()};
|
||||
viewOptions.collation = mainColl->getCollectionOptions().collation;
|
||||
constexpr bool asArray = true;
|
||||
viewOptions.pipeline =
|
||||
timeseries::generateViewPipeline(*mainColl->getTimeseriesOptions(), asArray);
|
||||
uassertStatusOK(
|
||||
autoDb.getDb()->userCreateNS(opCtx, mainNs, viewOptions, /*createIdIndex=*/false));
|
||||
|
||||
// Add validator to the buckets collection.
|
||||
CollectionWriter collWriter{opCtx, bucketsNs};
|
||||
Collection* writableColl = collWriter.getWritableCollection(opCtx);
|
||||
auto timeField = mainColl->getTimeseriesOptions()->getTimeField();
|
||||
int bucketVersion = timeseries::kTimeseriesControlLatestVersion;
|
||||
uassertStatusOK(writableColl->updateValidator(
|
||||
opCtx,
|
||||
timeseries::generateTimeseriesValidator(bucketVersion, timeField),
|
||||
boost::none /* validationLevel */,
|
||||
boost::none /* validatorAction */));
|
||||
}
|
||||
|
||||
// Log a oplog entry giving a single, atomic timestamp to all operations done above.
|
||||
// TODO(SERVER-114505): Call the OpObserver to emit an oplog entry
|
||||
|
||||
wuow.commit();
|
||||
});
|
||||
|
||||
LOGV2(11483017, "Finished downgrade of timeseries collection format", logAttrs(mainNs));
|
||||
}
|
||||
|
||||
std::vector<BucketsCollectionInconsistency> checkBucketCollectionInconsistencies(
|
||||
OperationContext* opCtx,
|
||||
const CollectionPtr& bucketsColl,
|
||||
bool ensureViewExists,
|
||||
const ViewDefinition* view,
|
||||
const Collection* mainColl) {
|
||||
std::vector<BucketsCollectionInconsistency> inconsistencies;
|
||||
|
||||
tassert(11483018,
|
||||
"Expected 'bucketsColl' to exist and be a timeseries buckets namespace",
|
||||
bucketsColl && bucketsColl->ns().isTimeseriesBucketsCollection());
|
||||
const auto& nss = bucketsColl->ns();
|
||||
|
||||
const std::string errMsgPrefix = str::stream()
|
||||
<< nss.toStringForErrorMsg() << " is a bucket collection but is missing";
|
||||
|
||||
// A bucket collection must always have timeseries options
|
||||
const bool hasTimeseriesOptions = bucketsColl->isTimeseriesCollection();
|
||||
if (!hasTimeseriesOptions) {
|
||||
const std::string errMsg = str::stream() << errMsgPrefix << " the timeseries options";
|
||||
const BSONObj options = bucketsColl->getCollectionOptions().toBSON();
|
||||
inconsistencies.emplace_back(
|
||||
BucketsCollectionInconsistency{std::move(errMsg), std::move(options)});
|
||||
return inconsistencies;
|
||||
}
|
||||
|
||||
// A bucket collection on the primary shard must always be backed by a view in the proper
|
||||
// format. Check if there is a valid view, otherwise return current view/collection options (if
|
||||
// present).
|
||||
const auto [hasValidView, invalidOptions] = [&] {
|
||||
if (view) {
|
||||
if (view->viewOn() == nss && view->pipeline().size() == 1) {
|
||||
const auto expectedViewPipeline = timeseries::generateViewPipeline(
|
||||
*bucketsColl->getTimeseriesOptions(), false /* asArray */);
|
||||
const auto expectedInternalUnpackStage =
|
||||
expectedViewPipeline
|
||||
.getField(DocumentSourceInternalUnpackBucket::kStageNameInternal)
|
||||
.Obj();
|
||||
const auto actualPipeline = view->pipeline().front();
|
||||
if (actualPipeline.hasField(
|
||||
DocumentSourceInternalUnpackBucket::kStageNameInternal)) {
|
||||
const auto actualInternalUnpackStage =
|
||||
actualPipeline
|
||||
.getField(DocumentSourceInternalUnpackBucket::kStageNameInternal)
|
||||
.Obj()
|
||||
// Ignore `exclude` field introduced in v5.0 and removed in v5.1
|
||||
.removeField(DocumentSourceInternalUnpackBucket::kExclude);
|
||||
if (actualInternalUnpackStage.woCompare(expectedInternalUnpackStage) == 0) {
|
||||
// The view is in the expected format
|
||||
return std::make_pair(true, BSONObj());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The view is not in the expected format, return the current options for debugging
|
||||
BSONArrayBuilder pipelineArray;
|
||||
const auto& pipeline = view->pipeline();
|
||||
for (const auto& stage : pipeline) {
|
||||
pipelineArray.append(stage);
|
||||
}
|
||||
|
||||
const BSONObj currentViewOptions = BSON("viewOn" << toStringForLogging(view->viewOn())
|
||||
<< "pipeline" << pipelineArray.arr());
|
||||
|
||||
return std::make_pair(false, currentViewOptions);
|
||||
}
|
||||
|
||||
if (mainColl) {
|
||||
// A collection is present rather than a view, return the current options for debugging
|
||||
return std::make_pair(false, mainColl->getCollectionOptions().toBSON());
|
||||
}
|
||||
|
||||
return std::make_pair(view || !ensureViewExists, BSONObj());
|
||||
}();
|
||||
|
||||
if (!hasValidView) {
|
||||
const std::string errMsg = str::stream() << errMsgPrefix << " a valid view backing it";
|
||||
inconsistencies.emplace_back(
|
||||
BucketsCollectionInconsistency{std::move(errMsg), std::move(invalidOptions)});
|
||||
}
|
||||
|
||||
return inconsistencies;
|
||||
}
|
||||
|
||||
} // namespace timeseries
|
||||
} // namespace mongo
|
||||
|
|
@ -0,0 +1,105 @@
|
|||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection.h"
|
||||
#include "mongo/db/views/view.h"
|
||||
#include "mongo/util/modules.h"
|
||||
#include "mongo/util/uuid.h"
|
||||
|
||||
#include <boost/optional/optional.hpp>
|
||||
|
||||
MONGO_MOD_PUBLIC;
|
||||
|
||||
namespace mongo::timeseries {
|
||||
|
||||
/**
|
||||
* Shard catalog upgrade/downgrade between viewful and viewless timeseries.
|
||||
*
|
||||
* For both upgrade and downgrade, `mainNs` is the main namespace of the timeseries collection
|
||||
* (i.e. without the 'system.buckets' prefix).
|
||||
*
|
||||
* Do not call this function while holding collection-level locks, since it acquires its own locks
|
||||
* internally, so the previously acquired locks may lead to a lock ordering violation.
|
||||
*
|
||||
* The caller must ensure the backing collection (when upgrading, the 'system.buckets' collection)
|
||||
* exists, has timeseries options, and that neither the view nor buckets namespaces are concurrently
|
||||
* modified or dropped. This must be done via higher-level means than collection locks
|
||||
* (for example, via the DDL lock, or since oplog command entries are applied individually).
|
||||
*
|
||||
* The caller must only call the upgrade function with the viewless timeseries feature flag enabled.
|
||||
* This ensures no new viewful collections can be created while upgrading, so after setFCV finishes
|
||||
* all collections are viewless. Similarly, the feature flag must be disabled for downgrading.
|
||||
*
|
||||
* It is possible that an upgrade is not possible because the collection or view is malformed
|
||||
* or there is a conflicting namespace; in this case the upgrade is skipped (i.e. a no-op).
|
||||
* Downgrading is always possible.
|
||||
*
|
||||
* Considerations for oplog application:
|
||||
* - Both upgrade and downgrade are idempotent.
|
||||
* - The `expectedUUID` parameter is used to check that the targeted namespace has the expected
|
||||
* incarnation of the collection, which may not be the case during initial sync oplog application.
|
||||
*/
|
||||
void upgradeToViewlessTimeseries(OperationContext* opCtx,
|
||||
const NamespaceString& mainNs,
|
||||
const boost::optional<UUID>& expectedUUID = boost::none);
|
||||
void downgradeFromViewlessTimeseries(OperationContext* opCtx,
|
||||
const NamespaceString& mainNs,
|
||||
const boost::optional<UUID>& expectedUUID = boost::none);
|
||||
|
||||
/**
|
||||
* Validate if a viewful timeseries collection is well-formed.
|
||||
*
|
||||
* `bucketsColl` is the system.buckets namespace of the collection (which must exist),
|
||||
* while `view` is the view on the main namespace (or `nullptr` if it does not exist).
|
||||
*
|
||||
* To detect the inconsistency where there is a conflicting collection on the main namespace,
|
||||
* `mainColl` is the corresponding collection on the main namespace if one exists.
|
||||
*
|
||||
* If `ensureViewExists` is false, we tolerate that a timeseries view does not exist on the main NS.
|
||||
* This is used in two cases:
|
||||
* - When validating non-DB primary shards, since only the DB primary shard has the view.
|
||||
* - We allow upgrading to viewless timeseries if just the view is missing.
|
||||
*/
|
||||
struct BucketsCollectionInconsistency {
|
||||
std::string issue;
|
||||
BSONObj options;
|
||||
};
|
||||
|
||||
std::vector<BucketsCollectionInconsistency> checkBucketCollectionInconsistencies(
|
||||
OperationContext* opCtx,
|
||||
const CollectionPtr& bucketsColl,
|
||||
bool ensureViewExists,
|
||||
const ViewDefinition* view,
|
||||
const Collection* mainColl);
|
||||
|
||||
} // namespace mongo::timeseries
|
||||
|
|
@ -0,0 +1,260 @@
|
|||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/timeseries/upgrade_downgrade_viewless_timeseries.h"
|
||||
|
||||
#include "mongo/db/shard_role/shard_catalog/catalog_test_fixture.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_catalog.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/create_collection.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
|
||||
|
||||
namespace mongo::timeseries {
|
||||
namespace {
|
||||
|
||||
class UpgradeDowngradeViewlessTimeseriesTest : public CatalogTestFixture {
|
||||
protected:
|
||||
void setUp() override {
|
||||
CatalogTestFixture::setUp();
|
||||
}
|
||||
|
||||
void createTimeseriesCollection(const NamespaceString& nss) {
|
||||
CreateCommand cmd = CreateCommand(nss);
|
||||
cmd.getCreateCollectionRequest().setTimeseries(TimeseriesOptions("timestamp"));
|
||||
ASSERT_OK(createCollection(operationContext(), cmd));
|
||||
}
|
||||
|
||||
UUID createViewfulTimeseriesCollection(const NamespaceString& nss) {
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", false);
|
||||
createTimeseriesCollection(nss);
|
||||
return CollectionCatalog::get(operationContext())
|
||||
->lookupCollectionByNamespace(
|
||||
operationContext(),
|
||||
nss.isTimeseriesBucketsCollection() ? nss : nss.makeTimeseriesBucketsNamespace())
|
||||
->uuid();
|
||||
}
|
||||
|
||||
UUID createViewlessTimeseriesCollection(const NamespaceString& nss) {
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
createTimeseriesCollection(nss);
|
||||
return CollectionCatalog::get(operationContext())
|
||||
->lookupCollectionByNamespace(operationContext(), nss)
|
||||
->uuid();
|
||||
}
|
||||
|
||||
void assertIsTimeseriesCollection(const NamespaceString& nss, const UUID& uuid) {
|
||||
auto opCtx = operationContext();
|
||||
|
||||
auto coll = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
|
||||
ASSERT(coll && coll->isTimeseriesCollection() && coll->uuid() == uuid);
|
||||
}
|
||||
|
||||
void assertIsViewfulTimeseries(const NamespaceString& nss, const UUID& uuid) {
|
||||
auto opCtx = operationContext();
|
||||
|
||||
ASSERT(!CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss));
|
||||
auto view = CollectionCatalog::get(opCtx)->lookupView(opCtx, nss);
|
||||
ASSERT(view && view->timeseries() &&
|
||||
view->viewOn() == nss.makeTimeseriesBucketsNamespace());
|
||||
|
||||
assertIsTimeseriesCollection(nss.makeTimeseriesBucketsNamespace(), uuid);
|
||||
}
|
||||
|
||||
void assertIsViewlessTimeseries(const NamespaceString& nss, const UUID& uuid) {
|
||||
auto opCtx = operationContext();
|
||||
|
||||
assertIsTimeseriesCollection(nss, uuid);
|
||||
|
||||
ASSERT(!CollectionCatalog::get(opCtx)->lookupView(opCtx, nss));
|
||||
ASSERT(!CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(
|
||||
opCtx, nss.makeTimeseriesBucketsNamespace()));
|
||||
}
|
||||
|
||||
const NamespaceString nss1 = NamespaceString::createNamespaceString_forTest("test", "foo");
|
||||
const NamespaceString nss2 = NamespaceString::createNamespaceString_forTest("test", "bar");
|
||||
};
|
||||
|
||||
/**
|
||||
* Basic upgrade/downgrade.
|
||||
*/
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, UpgradeOne) {
|
||||
auto uuid1 = createViewfulTimeseriesCollection(nss1);
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
upgradeToViewlessTimeseries(operationContext(), nss1);
|
||||
assertIsViewlessTimeseries(nss1, uuid1);
|
||||
}
|
||||
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, DowngradeOne) {
|
||||
auto uuid1 = createViewlessTimeseriesCollection(nss1);
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", false);
|
||||
downgradeFromViewlessTimeseries(operationContext(), nss1);
|
||||
assertIsViewfulTimeseries(nss1, uuid1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Idempotency.
|
||||
*/
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, UpgradeOneWithExpectedUUID) {
|
||||
auto uuid1 = createViewfulTimeseriesCollection(nss1);
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
upgradeToViewlessTimeseries(operationContext(), nss1, uuid1 /* expectedUUID */);
|
||||
assertIsViewlessTimeseries(nss1, uuid1);
|
||||
}
|
||||
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, UpgradeIdempotency) {
|
||||
auto uuid1 = createViewlessTimeseriesCollection(nss1);
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
upgradeToViewlessTimeseries(operationContext(), nss1);
|
||||
assertIsViewlessTimeseries(nss1, uuid1);
|
||||
}
|
||||
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, DowngradeOneWithExpectedUUID) {
|
||||
auto uuid1 = createViewlessTimeseriesCollection(nss1);
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", false);
|
||||
downgradeFromViewlessTimeseries(operationContext(), nss1, uuid1 /* expectedUUID */);
|
||||
assertIsViewfulTimeseries(nss1, uuid1);
|
||||
}
|
||||
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, DowngradeIdempotency) {
|
||||
auto uuid = createViewfulTimeseriesCollection(nss1);
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", false);
|
||||
downgradeFromViewlessTimeseries(operationContext(), nss1);
|
||||
assertIsViewfulTimeseries(nss1, uuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that the collection options resulting from an upgrade/downgrade are consistent with those
|
||||
* of a brand new collection.
|
||||
*
|
||||
* This exercises the metadata changes, e.g. adding/removing the buckets validator.
|
||||
*/
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, UpgradedOptionsConsistentWithNewViewless) {
|
||||
createViewfulTimeseriesCollection(nss1);
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
upgradeToViewlessTimeseries(operationContext(), nss1);
|
||||
|
||||
createViewlessTimeseriesCollection(nss2);
|
||||
|
||||
auto catalog = CollectionCatalog::get(operationContext());
|
||||
auto options1 =
|
||||
catalog->lookupCollectionByNamespace(operationContext(), nss1)->getCollectionOptions();
|
||||
auto options2 =
|
||||
catalog->lookupCollectionByNamespace(operationContext(), nss2)->getCollectionOptions();
|
||||
ASSERT_BSONOBJ_EQ(options1.toBSON(false /* includeUUID */),
|
||||
options2.toBSON(false /* includeUUID */));
|
||||
}
|
||||
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, DowngradedOptionsConsistentWithNewViewful) {
|
||||
createViewlessTimeseriesCollection(nss1);
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", false);
|
||||
downgradeFromViewlessTimeseries(operationContext(), nss1);
|
||||
|
||||
createViewfulTimeseriesCollection(nss2);
|
||||
|
||||
auto catalog = CollectionCatalog::get(operationContext());
|
||||
auto options1 =
|
||||
catalog
|
||||
->lookupCollectionByNamespace(operationContext(), nss1.makeTimeseriesBucketsNamespace())
|
||||
->getCollectionOptions();
|
||||
auto options2 =
|
||||
catalog
|
||||
->lookupCollectionByNamespace(operationContext(), nss2.makeTimeseriesBucketsNamespace())
|
||||
->getCollectionOptions();
|
||||
ASSERT_BSONOBJ_EQ(options1.toBSON(false /* includeUUID */),
|
||||
options2.toBSON(false /* includeUUID */));
|
||||
|
||||
auto viewPipeline1 = catalog->lookupView(operationContext(), nss1)->pipeline();
|
||||
auto viewPipeline2 = catalog->lookupView(operationContext(), nss2)->pipeline();
|
||||
ASSERT_EQ(viewPipeline1.size(), viewPipeline2.size());
|
||||
for (size_t i = 0; i < viewPipeline1.size(); i++) {
|
||||
ASSERT_BSONOBJ_EQ(viewPipeline1[i], viewPipeline2[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handling of inconsistent collections on upgrade.
|
||||
*/
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, UpgradeWithoutView) {
|
||||
auto uuid1 = createViewfulTimeseriesCollection(nss1.makeTimeseriesBucketsNamespace());
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
upgradeToViewlessTimeseries(operationContext(), nss1);
|
||||
assertIsViewlessTimeseries(nss1, uuid1);
|
||||
}
|
||||
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, UpgradeSkippedOnConflictingCollection) {
|
||||
auto uuid1 = createViewfulTimeseriesCollection(nss1.makeTimeseriesBucketsNamespace());
|
||||
ASSERT_OK(createCollection(operationContext(), CreateCommand(nss1)));
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
upgradeToViewlessTimeseries(operationContext(), nss1);
|
||||
assertIsTimeseriesCollection(nss1.makeTimeseriesBucketsNamespace(), uuid1);
|
||||
ASSERT(CollectionCatalog::get(operationContext())
|
||||
->lookupCollectionByNamespace(operationContext(), nss1));
|
||||
}
|
||||
|
||||
TEST_F(UpgradeDowngradeViewlessTimeseriesTest, UpgradeSkippedOnConflictingView) {
|
||||
auto uuid1 = createViewfulTimeseriesCollection(nss1.makeTimeseriesBucketsNamespace());
|
||||
|
||||
CreateCommand createViewCmd(nss1);
|
||||
createViewCmd.setViewOn(nss2.coll());
|
||||
createViewCmd.setPipeline({{}});
|
||||
ASSERT_OK(createCollection(operationContext(), createViewCmd));
|
||||
|
||||
RAIIServerParameterControllerForTest featureFlagController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
upgradeToViewlessTimeseries(operationContext(), nss1);
|
||||
assertIsTimeseriesCollection(nss1.makeTimeseriesBucketsNamespace(), uuid1);
|
||||
ASSERT(CollectionCatalog::get(operationContext())->lookupView(operationContext(), nss1));
|
||||
}
|
||||
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo::timeseries
|
||||
Loading…
Reference in New Issue