SERVER-103653 Remove uses of CollectionOptions from the KVEngine RecordStore functions (#35661)

GitOrigin-RevId: 16871548654b04c5c5a2e0961444d1dbd33f255d
This commit is contained in:
Haley Connelly 2025-05-06 15:04:40 -05:00 committed by MongoDB Bot
parent 1f11911cbd
commit 14cc97aaf6
31 changed files with 788 additions and 644 deletions

1
.github/CODEOWNERS vendored
View File

@ -1524,6 +1524,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
/src/mongo/db/catalog/**/collection_mock.* @10gen/server-catalog-and-routing @svc-auto-approve-bot /src/mongo/db/catalog/**/collection_mock.* @10gen/server-catalog-and-routing @svc-auto-approve-bot
/src/mongo/db/catalog/**/collection_operation_source* @10gen/server-catalog-and-routing @svc-auto-approve-bot /src/mongo/db/catalog/**/collection_operation_source* @10gen/server-catalog-and-routing @svc-auto-approve-bot
/src/mongo/db/catalog/**/collection_options* @10gen/server-catalog-and-routing @svc-auto-approve-bot /src/mongo/db/catalog/**/collection_options* @10gen/server-catalog-and-routing @svc-auto-approve-bot
/src/mongo/db/catalog/**/collection_record_store_options* @10gen/server-catalog-and-routing @svc-auto-approve-bot
/src/mongo/db/catalog/**/collection_test.cpp @10gen/server-catalog-and-routing @svc-auto-approve-bot /src/mongo/db/catalog/**/collection_test.cpp @10gen/server-catalog-and-routing @svc-auto-approve-bot
/src/mongo/db/catalog/**/collection_uuid_mismatch* @10gen/server-catalog-and-routing @svc-auto-approve-bot /src/mongo/db/catalog/**/collection_uuid_mismatch* @10gen/server-catalog-and-routing @svc-auto-approve-bot
/src/mongo/db/catalog/**/collection_writer_test.cpp @10gen/server-catalog-and-routing @svc-auto-approve-bot /src/mongo/db/catalog/**/collection_writer_test.cpp @10gen/server-catalog-and-routing @svc-auto-approve-bot

View File

@ -516,6 +516,7 @@ mongo_cc_library(
deps = [ deps = [
":catalog_stats", ":catalog_stats",
":collection_catalog", ":collection_catalog",
":collection_record_store_options",
":database_holder", ":database_holder",
"//src/mongo/db:multitenancy", "//src/mongo/db:multitenancy",
"//src/mongo/db:server_base", "//src/mongo/db:server_base",
@ -547,6 +548,21 @@ mongo_cc_library(
], ],
) )
mongo_cc_library(
name = "collection_record_store_options",
srcs = [
"collection_record_store_options.cpp",
],
hdrs = [
"collection_record_store_options.h",
],
deps = [
":collection_options",
"//src/mongo/db:server_base",
"//src/mongo/db/storage:record_store_base",
],
)
mongo_cc_library( mongo_cc_library(
name = "catalog_impl", name = "catalog_impl",
srcs = [ srcs = [
@ -817,3 +833,17 @@ mongo_cc_unit_test(
"//src/mongo/util:pcre_wrapper", "//src/mongo/util:pcre_wrapper",
], ],
) )
mongo_cc_unit_test(
name = "collection_record_store_options_test",
srcs = [
"collection_record_store_options_test.cpp",
],
tags = ["mongo_unittest_eighth_group"],
deps = [
":collection_record_store_options",
"//src/mongo/db:server_base",
"//src/mongo/db/storage:record_store_base",
"//src/mongo/unittest",
],
)

View File

@ -67,6 +67,9 @@ filters:
- "collection_options*": - "collection_options*":
approvers: approvers:
- 10gen/server-catalog-and-routing - 10gen/server-catalog-and-routing
- "collection_record_store_options*":
approvers:
- 10gen/server-catalog-and-routing
- "collection_test.cpp": - "collection_test.cpp":
approvers: approvers:
- 10gen/server-catalog-and-routing - 10gen/server-catalog-and-routing

View File

@ -55,6 +55,7 @@
#include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog/collection_record_store_options.h"
#include "mongo/db/catalog/uncommitted_catalog_updates.h" #include "mongo/db/catalog/uncommitted_catalog_updates.h"
#include "mongo/db/client.h" #include "mongo/db/client.h"
#include "mongo/db/commands/server_status.h" #include "mongo/db/commands/server_status.h"
@ -201,7 +202,9 @@ void initCollectionObject(OperationContext* opCtx,
// repaired. This also ensures that if we try to use it, it will blow up. // repaired. This also ensures that if we try to use it, it will blow up.
rs = nullptr; rs = nullptr;
} else { } else {
rs = engine->getEngine()->getRecordStore(opCtx, nss, ident, md->options); const auto uuid = md->options.uuid;
const auto recordStoreOptions = getRecordStoreOptions(nss, md->options);
rs = engine->getEngine()->getRecordStore(opCtx, nss, ident, recordStoreOptions, uuid);
invariant(rs); invariant(rs);
} }
@ -1558,28 +1561,30 @@ std::shared_ptr<Collection> CollectionCatalog::_createNewPITCollection(
} }
// Instantiate a new collection without any shared state. // Instantiate a new collection without any shared state.
const auto nss = catalogEntry.metadata->nss;
LOGV2_DEBUG(6825401, LOGV2_DEBUG(6825401,
1, 1,
"Instantiating a new collection", "Instantiating a new collection",
logAttrs(catalogEntry.metadata->nss), logAttrs(nss),
"ident"_attr = catalogEntry.ident, "ident"_attr = catalogEntry.ident,
"md"_attr = catalogEntry.metadata->toBSON(), "md"_attr = catalogEntry.metadata->toBSON(),
"timestamp"_attr = readTimestamp); "timestamp"_attr = readTimestamp);
const auto collectionOptions = catalogEntry.metadata->options;
std::unique_ptr<RecordStore> rs = std::unique_ptr<RecordStore> rs =
opCtx->getServiceContext()->getStorageEngine()->getEngine()->getRecordStore( opCtx->getServiceContext()->getStorageEngine()->getEngine()->getRecordStore(
opCtx, catalogEntry.metadata->nss, catalogEntry.ident, catalogEntry.metadata->options); opCtx,
nss,
catalogEntry.ident,
getRecordStoreOptions(nss, collectionOptions),
collectionOptions.uuid);
// Set the ident to the one returned by the ident reaper. This is to prevent the ident from // Set the ident to the one returned by the ident reaper. This is to prevent the ident from
// being dropping prematurely. // being dropping prematurely.
rs->setIdent(std::move(newIdent)); rs->setIdent(std::move(newIdent));
std::shared_ptr<Collection> collToReturn = std::shared_ptr<Collection> collToReturn = Collection::Factory::get(opCtx)->make(
Collection::Factory::get(opCtx)->make(opCtx, opCtx, nss, catalogEntry.catalogId, catalogEntry.metadata, std::move(rs));
catalogEntry.metadata->nss,
catalogEntry.catalogId,
catalogEntry.metadata,
std::move(rs));
Status status = Status status =
collToReturn->initFromExisting(opCtx, /*collection=*/nullptr, catalogEntry, readTimestamp); collToReturn->initFromExisting(opCtx, /*collection=*/nullptr, catalogEntry, readTimestamp);
if (!status.isOK()) { if (!status.isOK()) {

View File

@ -0,0 +1,65 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/catalog/collection_record_store_options.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/storage/record_store.h"
namespace mongo {
RecordStore::Options getRecordStoreOptions(const NamespaceString& nss,
const CollectionOptions& collectionOptions) {
RecordStore::Options recordStoreOptions;
bool isClustered = collectionOptions.clusteredIndex.has_value();
recordStoreOptions.keyFormat = isClustered ? KeyFormat::String : KeyFormat::Long;
recordStoreOptions.allowOverwrite = isClustered ? false : true;
recordStoreOptions.isCapped = collectionOptions.capped;
recordStoreOptions.isOplog = nss.isOplog();
if (recordStoreOptions.isOplog) {
// Only relevant for specialized oplog handling.
recordStoreOptions.oplogMaxSize = collectionOptions.cappedSize;
}
recordStoreOptions.isChangeCollection = nss.isChangeCollection();
bool isTimeseries = collectionOptions.timeseries.has_value();
if (isTimeseries) {
recordStoreOptions.customBlockCompressor =
kDefaultTimeseriesCollectionCompressor.toString();
recordStoreOptions.forceUpdateWithFullDocument = isTimeseries;
}
recordStoreOptions.storageEngineCollectionOptions = collectionOptions.storageEngine;
return recordStoreOptions;
}
} // namespace mongo

View File

@ -0,0 +1,44 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/storage/record_store.h"
namespace mongo {
static constexpr auto kDefaultTimeseriesCollectionCompressor = "zstd"_sd;
/**
* Each Collection is backed by a RecordStore in the storage layer. Translates 'CollectionOptions'
* into 'RecordStore::Options' for a Collection's RecordStore.
*/
RecordStore::Options getRecordStoreOptions(const NamespaceString& nss,
const CollectionOptions& collectionOptions);
} // namespace mongo

View File

@ -0,0 +1,197 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/catalog/clustered_collection_util.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog/collection_record_store_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/storage/key_format.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/str.h"
namespace mongo {
namespace {
BSONObj convertToBSON(const RecordStore::Options& opts) {
BSONObjBuilder bob;
bob.append("keyFormat", opts.keyFormat);
bob.append("isCapped", opts.isCapped);
bob.append("isOplog", opts.isOplog);
bob.append("oplogMaxSize", opts.oplogMaxSize);
bob.append("isChangeCollection", opts.isChangeCollection);
bob.append("allowOverwrite", opts.allowOverwrite);
bob.append("forceUpdateWithFullDocument", opts.forceUpdateWithFullDocument);
bob.append("customBlockCompressor", opts.customBlockCompressor.value_or("none"));
bob.append("storageEngineCollectionOptions", opts.storageEngineCollectionOptions);
return bob.obj();
}
std::string errMsgDetails(const RecordStore::Options& expectedOpts,
const RecordStore::Options& actualOpts) {
return str::stream() << "Expected 'RecordStore::Options': " << convertToBSON(expectedOpts)
<< ", actual 'RecordStore::Options': " << convertToBSON(actualOpts);
}
class CollectionRecordStoreOptionsTest : public unittest::Test {
protected:
const NamespaceString kBasicNss =
NamespaceString::createNamespaceString_forTest("testdb", "testcol");
// Asserts each field in 'actualOpts' matches that of 'expectedOpts'.
void assertEQ(const RecordStore::Options& expectedOpts,
const RecordStore::Options& actualOpts) {
ASSERT_EQ(expectedOpts.keyFormat, actualOpts.keyFormat)
<< errMsgDetails(expectedOpts, actualOpts);
ASSERT_EQ(expectedOpts.isCapped, actualOpts.isCapped)
<< errMsgDetails(expectedOpts, actualOpts);
ASSERT_EQ(expectedOpts.isOplog, actualOpts.isOplog)
<< errMsgDetails(expectedOpts, actualOpts);
ASSERT_EQ(expectedOpts.oplogMaxSize, actualOpts.oplogMaxSize)
<< errMsgDetails(expectedOpts, actualOpts);
ASSERT_EQ(expectedOpts.isChangeCollection, actualOpts.isChangeCollection)
<< errMsgDetails(expectedOpts, actualOpts);
ASSERT_EQ(expectedOpts.allowOverwrite, actualOpts.allowOverwrite)
<< errMsgDetails(expectedOpts, actualOpts);
ASSERT_EQ(expectedOpts.forceUpdateWithFullDocument, actualOpts.forceUpdateWithFullDocument)
<< errMsgDetails(expectedOpts, actualOpts);
ASSERT_EQ(expectedOpts.customBlockCompressor, actualOpts.customBlockCompressor)
<< errMsgDetails(expectedOpts, actualOpts);
ASSERT_EQ(0,
expectedOpts.storageEngineCollectionOptions.woCompare(
actualOpts.storageEngineCollectionOptions))
<< errMsgDetails(expectedOpts, actualOpts);
}
};
TEST_F(CollectionRecordStoreOptionsTest, DefaultRecordStoreOptions) {
CollectionOptions collOptions;
const auto actualRSOptions = getRecordStoreOptions(kBasicNss, collOptions);
// Default 'RecordStore::Options' should match those generated for the default
// 'CollectionOptions'.
const RecordStore::Options kDefaultRSOptions{};
assertEQ(kDefaultRSOptions, actualRSOptions);
}
TEST_F(CollectionRecordStoreOptionsTest, ClusteredRecordStoreOptionsBasic) {
CollectionOptions collOptions;
collOptions.clusteredIndex = clustered_util::makeDefaultClusteredIdIndex();
const auto actualRSOptions = getRecordStoreOptions(kBasicNss, collOptions);
RecordStore::Options expectedRSOptions{.keyFormat = KeyFormat::String, .allowOverwrite = false};
assertEQ(expectedRSOptions, actualRSOptions);
}
TEST_F(CollectionRecordStoreOptionsTest, CappedRecordStoreOptionsBasic) {
CollectionOptions collOptions;
collOptions.capped = true;
const auto actualRSOptions = getRecordStoreOptions(kBasicNss, collOptions);
RecordStore::Options expectedRSOptions{.isCapped = true};
assertEQ(expectedRSOptions, actualRSOptions);
}
TEST_F(CollectionRecordStoreOptionsTest, CappedRecordStoreOptions) {
CollectionOptions collOptions;
// Aside from 'capped', other capped related 'CollectionOptions' fields aren't relevant when
// generating a RecordStore for a non-oplog collection. For example, 'cappedSize' and
// 'cappedMaxDocs' don't impact the 'RecordStore::Options' generated for 'kBasicNss'.
collOptions.capped = true;
collOptions.cappedMaxDocs = 100 /* arbitrary */;
collOptions.cappedSize = 100 /* arbitrary */;
const auto actualRSOptions = getRecordStoreOptions(kBasicNss, collOptions);
RecordStore::Options expectedRSOptions{.isCapped = true};
assertEQ(expectedRSOptions, actualRSOptions);
}
TEST_F(CollectionRecordStoreOptionsTest, CappedClusteredRecordStoreOptions) {
CollectionOptions collOptions;
collOptions.clusteredIndex = clustered_util::makeDefaultClusteredIdIndex();
collOptions.capped = true;
const auto actualRSOptions = getRecordStoreOptions(kBasicNss, collOptions);
RecordStore::Options expectedRSOptions{
.keyFormat = KeyFormat::String, .isCapped = true, .allowOverwrite = false};
assertEQ(expectedRSOptions, actualRSOptions);
}
TEST_F(CollectionRecordStoreOptionsTest, OplogRecordStoreOptions) {
CollectionOptions collOptions;
collOptions.capped = true;
collOptions.cappedSize = 100;
const auto actualRSOptions =
getRecordStoreOptions(NamespaceString::kRsOplogNamespace, collOptions);
RecordStore::Options expectedRSOptions{
.isCapped = true, .isOplog = true, .oplogMaxSize = collOptions.cappedSize};
assertEQ(expectedRSOptions, actualRSOptions);
}
/*
* Tests fields generated exclusively when 'CollectionOptions::timeseries' is set.
*/
TEST_F(CollectionRecordStoreOptionsTest, TimeseriesRecordStoreOptionsNotClustered) {
CollectionOptions collOptions;
// In practice, a valid set of CollectionOptions must have both 'CollectionOptions::timeseries'
// and 'CollectionOptions::clusteredIndex' set. 'getRecordStoreOptions()' does not perform
// validation on the provided CollectionOptions.
collOptions.timeseries = TimeseriesOptions(/*timeField=*/"t");
const auto actualRSOptions = getRecordStoreOptions(kBasicNss, collOptions);
RecordStore::Options expectedRSOptions{
.forceUpdateWithFullDocument = true /* exclusive for timeseries */,
.customBlockCompressor =
kDefaultTimeseriesCollectionCompressor.toString() /* exclusive for timeseries */};
assertEQ(expectedRSOptions, actualRSOptions);
}
TEST_F(CollectionRecordStoreOptionsTest, TimeseriesRecordStoreOptions) {
CollectionOptions collOptions;
collOptions.timeseries = TimeseriesOptions(/*timeField=*/"t");
collOptions.clusteredIndex = clustered_util::makeCanonicalClusteredInfoForLegacyFormat();
NamespaceString timeseriesNss =
NamespaceString::createNamespaceString_forTest("test.system.buckets.ts");
const auto actualRSOptions = getRecordStoreOptions(timeseriesNss, collOptions);
RecordStore::Options expectedRSOptions{
.keyFormat = KeyFormat::String,
.allowOverwrite = false,
.forceUpdateWithFullDocument = true /* exclusive for timeseries */,
.customBlockCompressor =
kDefaultTimeseriesCollectionCompressor.toString() /* exclusive for timeseries */};
assertEQ(expectedRSOptions, actualRSOptions);
}
TEST_F(CollectionRecordStoreOptionsTest, RecordStoreOptionsWithStorageEngineCollectionOptions) {
CollectionOptions collOptions;
collOptions.storageEngine =
BSON("create" << kBasicNss.coll() << "storageEngine"
<< BSON("wiredTiger" << BSON("configString" << "prefix_compression=true")));
const auto actualRSOptions = getRecordStoreOptions(kBasicNss, collOptions);
RecordStore::Options expectedRSOptions{.storageEngineCollectionOptions =
collOptions.storageEngine};
assertEQ(expectedRSOptions, actualRSOptions);
}
} // namespace
} // namespace mongo

View File

@ -49,7 +49,6 @@
#include "mongo/bson/bsontypes.h" #include "mongo/bson/bsontypes.h"
#include "mongo/bson/timestamp.h" #include "mongo/bson/timestamp.h"
#include "mongo/crypto/encryption_fields_gen.h" #include "mongo/crypto/encryption_fields_gen.h"
#include "mongo/db/catalog/clustered_collection_options_gen.h"
#include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_mock.h" #include "mongo/db/catalog/collection_mock.h"
#include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/collection_scan.h"
@ -163,7 +162,7 @@ class ChangeStreamOplogCollectionMock : public CollectionMock {
public: public:
ChangeStreamOplogCollectionMock() : CollectionMock(NamespaceString::kRsOplogNamespace) { ChangeStreamOplogCollectionMock() : CollectionMock(NamespaceString::kRsOplogNamespace) {
_recordStore = _devNullEngine.getRecordStore( _recordStore = _devNullEngine.getRecordStore(
nullptr, NamespaceString::kRsOplogNamespace, "", {.uuid = UUID::gen()}); nullptr, NamespaceString::kRsOplogNamespace, "", RecordStore::Options{}, UUID::gen());
} }
void push_back(Document doc) { void push_back(Document doc) {

View File

@ -282,6 +282,7 @@ mongo_cc_library(
":bson_collection_catalog_entry", ":bson_collection_catalog_entry",
":record_store_base", ":record_store_base",
"//src/mongo/db:server_base", "//src/mongo/db:server_base",
"//src/mongo/db/catalog:collection_record_store_options", # TODO(SERVER-100964): Remove.
"//src/mongo/db/concurrency:lock_manager", "//src/mongo/db/concurrency:lock_manager",
"//src/mongo/db/storage:feature_document_util", "//src/mongo/db/storage:feature_document_util",
"//src/mongo/db/storage:ident", "//src/mongo/db/storage:ident",

View File

@ -83,11 +83,8 @@ public:
class Capped; class Capped;
class Oplog; class Oplog;
DevNullRecordStore(boost::optional<UUID> uuid, DevNullRecordStore(boost::optional<UUID> uuid, StringData ident, KeyFormat keyFormat)
StringData identName, : RecordStoreBase(uuid, ident), _keyFormat(keyFormat) {
const CollectionOptions& options,
KeyFormat keyFormat)
: RecordStoreBase(uuid, identName), _options(options), _keyFormat(keyFormat) {
_numInserts = 0; _numInserts = 0;
_dummy = BSON("_id" << 1); _dummy = BSON("_id" << 1);
} }
@ -105,7 +102,9 @@ public:
} }
virtual bool isCapped() const { virtual bool isCapped() const {
return _options.capped; // Record stores for capped collections should inherit from 'DevNullRecordStore::Capped',
// which overrides this to true.
return false;
} }
KeyFormat keyFormat() const override { KeyFormat keyFormat() const override {
@ -226,7 +225,6 @@ private:
return Status::OK(); return Status::OK();
} }
CollectionOptions _options;
KeyFormat _keyFormat; KeyFormat _keyFormat;
long long _numInserts; long long _numInserts;
BSONObj _dummy; BSONObj _dummy;
@ -234,11 +232,12 @@ private:
class DevNullRecordStore::Capped : public DevNullRecordStore, public RecordStoreBase::Capped { class DevNullRecordStore::Capped : public DevNullRecordStore, public RecordStoreBase::Capped {
public: public:
Capped(boost::optional<UUID> uuid, Capped(boost::optional<UUID> uuid, StringData ident, KeyFormat keyFormat)
StringData identName, : DevNullRecordStore(uuid, ident, keyFormat) {}
const CollectionOptions& options,
KeyFormat keyFormat) bool isCapped() const final {
: DevNullRecordStore(uuid, identName, options, keyFormat) {} return true;
}
RecordStore::Capped* capped() override { RecordStore::Capped* capped() override {
return this; return this;
@ -254,8 +253,7 @@ private:
class DevNullRecordStore::Oplog final : public DevNullRecordStore::Capped, class DevNullRecordStore::Oplog final : public DevNullRecordStore::Capped,
public RecordStore::Oplog { public RecordStore::Oplog {
public: public:
Oplog(UUID uuid, StringData identName, const CollectionOptions& options) Oplog(UUID uuid, StringData ident) : DevNullRecordStore::Capped(uuid, ident, KeyFormat::Long) {}
: DevNullRecordStore::Capped(uuid, identName, options, KeyFormat::Long) {}
RecordStore::Capped* capped() override { RecordStore::Capped* capped() override {
return this; return this;
@ -398,16 +396,16 @@ std::unique_ptr<RecoveryUnit> DevNullKVEngine::newRecoveryUnit() {
std::unique_ptr<RecordStore> DevNullKVEngine::getRecordStore(OperationContext* opCtx, std::unique_ptr<RecordStore> DevNullKVEngine::getRecordStore(OperationContext* opCtx,
const NamespaceString& nss, const NamespaceString& nss,
StringData ident, StringData ident,
const CollectionOptions& options) { const RecordStore::Options& options,
boost::optional<UUID> uuid) {
if (ident == "_mdb_catalog") { if (ident == "_mdb_catalog") {
return std::make_unique<EphemeralForTestRecordStore>(options.uuid, ident, &_catalogInfo); return std::make_unique<EphemeralForTestRecordStore>(uuid, ident, &_catalogInfo);
} else if (nss == NamespaceString::kRsOplogNamespace) { } else if (options.isOplog) {
return std::make_unique<DevNullRecordStore::Oplog>(*options.uuid, ident, options); return std::make_unique<DevNullRecordStore::Oplog>(*uuid, ident);
} else if (options.capped) { } else if (options.isCapped) {
return std::make_unique<DevNullRecordStore::Capped>( return std::make_unique<DevNullRecordStore::Capped>(uuid, ident, options.keyFormat);
options.uuid, ident, options, KeyFormat::Long);
} }
return std::make_unique<DevNullRecordStore>(options.uuid, ident, options, KeyFormat::Long); return std::make_unique<DevNullRecordStore>(uuid, ident, options.keyFormat);
} }
std::unique_ptr<RecordStore> DevNullKVEngine::getTemporaryRecordStore(OperationContext* opCtx, std::unique_ptr<RecordStore> DevNullKVEngine::getTemporaryRecordStore(OperationContext* opCtx,
@ -419,8 +417,7 @@ std::unique_ptr<RecordStore> DevNullKVEngine::getTemporaryRecordStore(OperationC
std::unique_ptr<RecordStore> DevNullKVEngine::makeTemporaryRecordStore(OperationContext* opCtx, std::unique_ptr<RecordStore> DevNullKVEngine::makeTemporaryRecordStore(OperationContext* opCtx,
StringData ident, StringData ident,
KeyFormat keyFormat) { KeyFormat keyFormat) {
return std::make_unique<DevNullRecordStore>( return std::make_unique<DevNullRecordStore>(boost::none /* uuid */, ident, keyFormat);
boost::none /* uuid */, ident, CollectionOptions(), keyFormat);
} }
std::unique_ptr<SortedDataInterface> DevNullKVEngine::getSortedDataInterface( std::unique_ptr<SortedDataInterface> DevNullKVEngine::getSortedDataInterface(

View File

@ -43,7 +43,6 @@
#include "mongo/base/status_with.h" #include "mongo/base/status_with.h"
#include "mongo/base/string_data.h" #include "mongo/base/string_data.h"
#include "mongo/bson/timestamp.h" #include "mongo/bson/timestamp.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h" #include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h" #include "mongo/db/operation_context.h"
#include "mongo/db/storage/backup_block.h" #include "mongo/db/storage/backup_block.h"
@ -70,16 +69,15 @@ public:
Status createRecordStore(const NamespaceString& nss, Status createRecordStore(const NamespaceString& nss,
StringData ident, StringData ident,
KeyFormat keyFormat = KeyFormat::Long, const RecordStore::Options& options) override {
bool isTimeseries = false,
const BSONObj& storageEngineCollectionOptions = BSONObj()) override {
return Status::OK(); return Status::OK();
} }
std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx, std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx,
const NamespaceString& nss, const NamespaceString& nss,
StringData ident, StringData ident,
const CollectionOptions& options) override; const RecordStore::Options& options,
boost::optional<UUID> uuid) override;
std::unique_ptr<RecordStore> getTemporaryRecordStore(OperationContext* opCtx, std::unique_ptr<RecordStore> getTemporaryRecordStore(OperationContext* opCtx,
StringData ident, StringData ident,

View File

@ -42,6 +42,7 @@
#include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/bsontypes.h" #include "mongo/bson/bsontypes.h"
#include "mongo/bson/timestamp.h" #include "mongo/bson/timestamp.h"
#include "mongo/db/catalog/collection_record_store_options.h"
#include "mongo/db/concurrency/lock_manager_defs.h" #include "mongo/db/concurrency/lock_manager_defs.h"
#include "mongo/db/database_name.h" #include "mongo/db/database_name.h"
#include "mongo/db/operation_context.h" #include "mongo/db/operation_context.h"
@ -464,26 +465,16 @@ StatusWith<std::pair<RecordId, std::unique_ptr<RecordStore>>> DurableCatalog::cr
OperationContext* opCtx, OperationContext* opCtx,
const NamespaceString& nss, const NamespaceString& nss,
const std::string& ident, const std::string& ident,
const CollectionOptions& options) { const CollectionOptions& collectionOptions) {
invariant(shard_role_details::getLocker(opCtx)->isCollectionLockedForMode(nss, MODE_IX)); invariant(shard_role_details::getLocker(opCtx)->isCollectionLockedForMode(nss, MODE_IX));
invariant(nss.coll().size() > 0); invariant(nss.coll().size() > 0);
StatusWith<EntryIdentifier> swEntry = _addEntry(opCtx, nss, ident, options); StatusWith<EntryIdentifier> swEntry = _addEntry(opCtx, nss, ident, collectionOptions);
if (!swEntry.isOK()) if (!swEntry.isOK())
return swEntry.getStatus(); return swEntry.getStatus();
EntryIdentifier& entry = swEntry.getValue(); EntryIdentifier& entry = swEntry.getValue();
const auto recordStoreOptions = getRecordStoreOptions(nss, collectionOptions);
const auto keyFormat = [&] { Status status = _engine->createRecordStore(nss, ident, recordStoreOptions);
// Clustered collections require KeyFormat::String, but the opposite is not necessarily
// true: a clustered record store that is not associated with a collection has
// KeyFormat::String and and no CollectionOptions.
if (options.clusteredIndex) {
return KeyFormat::String;
}
return KeyFormat::Long;
}();
Status status = _engine->createRecordStore(
nss, entry.ident, keyFormat, options.timeseries.has_value(), options.storageEngine);
if (!status.isOK()) if (!status.isOK())
return status; return status;
@ -494,7 +485,8 @@ StatusWith<std::pair<RecordId, std::unique_ptr<RecordStore>>> DurableCatalog::cr
catalog->_engine->dropIdent(ru, ident, /*identHasSizeInfo=*/true).ignore(); catalog->_engine->dropIdent(ru, ident, /*identHasSizeInfo=*/true).ignore();
}); });
auto rs = _engine->getRecordStore(opCtx, nss, entry.ident, options); auto rs =
_engine->getRecordStore(opCtx, nss, ident, recordStoreOptions, collectionOptions.uuid);
invariant(rs); invariant(rs);
return std::pair<RecordId, std::unique_ptr<RecordStore>>(entry.catalogId, std::move(rs)); return std::pair<RecordId, std::unique_ptr<RecordStore>>(entry.catalogId, std::move(rs));
@ -599,7 +591,8 @@ StatusWith<DurableCatalog::ImportResult> DurableCatalog::importCollection(
} }
} }
auto rs = _engine->getRecordStore(opCtx, nss, entry.ident, md.options); auto rs = _engine->getRecordStore(
opCtx, nss, entry.ident, getRecordStoreOptions(nss, md.options), md.options.uuid);
invariant(rs); invariant(rs);
return DurableCatalog::ImportResult(entry.catalogId, std::move(rs), md.options.uuid.value()); return DurableCatalog::ImportResult(entry.catalogId, std::move(rs), md.options.uuid.value());

View File

@ -37,7 +37,6 @@
#include "mongo/base/error_codes.h" #include "mongo/base/error_codes.h"
#include "mongo/base/status.h" #include "mongo/base/status.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/client.h" #include "mongo/db/client.h"
#include "mongo/db/index/index_descriptor.h" #include "mongo/db/index/index_descriptor.h"
#include "mongo/db/namespace_string.h" #include "mongo/db/namespace_string.h"
@ -76,7 +75,8 @@ public:
std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx, std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx,
const NamespaceString& nss, const NamespaceString& nss,
StringData ident, StringData ident,
const CollectionOptions& options) override { const RecordStore::Options& options,
boost::optional<UUID> uuid) override {
return {}; return {};
} }
std::unique_ptr<SortedDataInterface> getSortedDataInterface(OperationContext* opCtx, std::unique_ptr<SortedDataInterface> getSortedDataInterface(OperationContext* opCtx,
@ -90,9 +90,7 @@ public:
Status createRecordStore(const NamespaceString& nss, Status createRecordStore(const NamespaceString& nss,
StringData ident, StringData ident,
KeyFormat keyFormat, const RecordStore::Options& options) override {
bool isTimeseries,
const BSONObj& storageEngineCollectionOptions) override {
return Status::OK(); return Status::OK();
} }

View File

@ -36,7 +36,6 @@
#include "mongo/base/status.h" #include "mongo/base/status.h"
#include "mongo/base/string_data.h" #include "mongo/base/string_data.h"
#include "mongo/bson/timestamp.h" #include "mongo/bson/timestamp.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/storage/compact_options.h" #include "mongo/db/storage/compact_options.h"
#include "mongo/db/storage/record_store.h" #include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/sorted_data_interface.h" #include "mongo/db/storage/sorted_data_interface.h"
@ -81,19 +80,17 @@ public:
} }
/** /**
* Requesting multiple copies for the same ns/ident is a rules violation; Calling on a * Requesting multiple copies for the same ident is a rules violation; Calling on a
* non-created ident is invalid and may crash. * non-created ident is invalid and may crash.
* *
* Trying to access this record store in the future will retrieve the pointer from the * Trying to access this record store in the future will retrieve the pointer from the
* collection object, and therefore this function can only be called once per namespace. * collection object, and therefore this function can only be called once per namespace.
*
* @param ident Will be created if it does not already exist.
*/ */
virtual std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx, virtual std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx,
const NamespaceString& nss, const NamespaceString& nss,
StringData ident, StringData ident,
const CollectionOptions& options) = 0; const RecordStore::Options& options,
boost::optional<UUID> uuid) = 0;
/** /**
* Opens an existing ident as a temporary record store. Must be used for record stores created * Opens an existing ident as a temporary record store. Must be used for record stores created
* with `makeTemporaryRecordStore`. Using `getRecordStore` would cause the record store to use * with `makeTemporaryRecordStore`. Using `getRecordStore` would cause the record store to use
@ -118,28 +115,11 @@ public:
* drop call on the KVEngine once the WUOW commits. Therefore drops will never be rolled * drop call on the KVEngine once the WUOW commits. Therefore drops will never be rolled
* back and it is safe to immediately reclaim storage. * back and it is safe to immediately reclaim storage.
* *
* . 'keyFormat': Defaults to the key format of a regular collection, 'KeyFormat::Long'. * Creates a 'RecordStore' and generated from the provided 'options'.
* Callers must specify 'KeyFormat::String' when creating a RecordStore for a clustered
* collection.
*
* . 'isTimeseries': True when the RecordStore is for a timeseries collection. Timeseries
* collections require specialized storage engine table configuration. TODO SERVER-100964:
* Remove timeseries concept from KvEngine.
*
* . 'storageEngineCollectionOptions': Empty by default. Holds collection-specific storage
* engine configuration options. For example, the 'storageEngine' options passed into
* `db.createCollection()`. Expected to be mirror the 'CollectionOptions::storageEngine'
* format { storageEngine: { <storage engine name> : { configString:
* "<option>=<setting>,..."} } }.
*
* Creates a 'RecordStore' and its underlying storage engine table with the designated
* 'keyFormat' and 'storageEngineCollectionOptions'.
*/ */
virtual Status createRecordStore(const NamespaceString& nss, virtual Status createRecordStore(const NamespaceString& nss,
StringData ident, StringData ident,
KeyFormat keyFormat = KeyFormat::Long, const RecordStore::Options& options) = 0;
bool isTimeseries = false,
const BSONObj& storageEngineCollectionOptions = BSONObj()) = 0;
/** /**
* RecordStores initially created with `makeTemporaryRecordStore` must be opened with * RecordStores initially created with `makeTemporaryRecordStore` must be opened with
@ -270,11 +250,8 @@ public:
*/ */
virtual Status recoverOrphanedIdent(const NamespaceString& nss, virtual Status recoverOrphanedIdent(const NamespaceString& nss,
StringData ident, StringData ident,
KeyFormat keyFormat = KeyFormat::Long, const RecordStore::Options& recordStoreOptions) {
bool isTimeseries = false, auto status = createRecordStore(nss, ident, recordStoreOptions);
const BSONObj& storageEngineCollectionOptions = BSONObj()) {
auto status =
createRecordStore(nss, ident, keyFormat, isTimeseries, storageEngineCollectionOptions);
if (status.isOK()) { if (status.isOK()) {
return {ErrorCodes::DataModifiedByRepair, "Orphan recovery created a new record store"}; return {ErrorCodes::DataModifiedByRepair, "Orphan recovery created a new record store"};
} }

View File

@ -105,10 +105,34 @@ protected:
return {std::move(client), std::move(opCtx)}; return {std::move(client), std::move(opCtx)};
} }
RecordId newCollection(OperationContext* opCtx, // Callers are responsible for managing the lifetime of 'catalogRS'.
const NamespaceString& ns, std::unique_ptr<DurableCatalog> createDurableCatalog(RecordStore* catalogRS,
const CollectionOptions& options, bool directoryPerDB = false,
DurableCatalog* catalog) { bool directoryForIndexes = false) {
return std::make_unique<DurableCatalog>(
catalogRS, directoryPerDB, directoryForIndexes, helper->getEngine());
}
std::unique_ptr<RecordStore> createCatalogRS() {
auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx();
KVEngine* engine = helper->getEngine();
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest("catalog"),
"catalog",
RecordStore::Options{}));
return engine->getRecordStore(opCtx,
NamespaceString::createNamespaceString_forTest("catalog"),
"catalog",
RecordStore::Options{},
UUID::gen());
}
RecordId addNewCatalogEntry(OperationContext* opCtx,
const NamespaceString& ns,
const CollectionOptions& options,
DurableCatalog* catalog) {
Lock::DBLock dbLk(opCtx, ns.dbName(), MODE_IX); Lock::DBLock dbLk(opCtx, ns.dbName(), MODE_IX);
// TODO SERVER-103136: Evaluate the better way to test idents generated with different // TODO SERVER-103136: Evaluate the better way to test idents generated with different
// <Directory<PerDb/ForIndexes>> options without relying on the DurableCatalog's tracking of // <Directory<PerDb/ForIndexes>> options without relying on the DurableCatalog's tracking of
@ -153,6 +177,12 @@ std::function<std::unique_ptr<KVHarnessHelper>(ServiceContext*)> basicFactory =
class KVEngineTestHarness : public ServiceContextTest { class KVEngineTestHarness : public ServiceContextTest {
protected: protected:
const NamespaceString kNss =
NamespaceString::createNamespaceString_forTest("defaultDB.defaultColl");
const std::string kIdent = "defaultIdent";
const UUID kUUID = UUID::gen();
const RecordStore::Options kRecordStoreOptions{};
ServiceContext::UniqueOperationContext _makeOperationContext(KVEngine* engine) { ServiceContext::UniqueOperationContext _makeOperationContext(KVEngine* engine) {
auto opCtx = makeOperationContext(); auto opCtx = makeOperationContext();
shard_role_details::setRecoveryUnit(opCtx.get(), shard_role_details::setRecoveryUnit(opCtx.get(),
@ -180,6 +210,22 @@ protected:
return opCtxs; return opCtxs;
} }
std::unique_ptr<RecordStore> newRecordStore(KVEngine* engine,
const NamespaceString& nss,
StringData ident,
const RecordStore::Options& recordStoreOptions,
boost::optional<UUID> uuid) {
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(engine->createRecordStore(nss, ident, recordStoreOptions));
auto rs = engine->getRecordStore(opCtx.get(), nss, ident, recordStoreOptions, uuid);
ASSERT(rs);
return rs;
}
std::unique_ptr<RecordStore> newRecordStore(KVEngine* engine) {
return newRecordStore(engine, kNss, kIdent, kRecordStoreOptions, kUUID);
}
}; };
TEST_F(KVEngineTestHarness, SimpleRS1) { TEST_F(KVEngineTestHarness, SimpleRS1) {
@ -187,20 +233,8 @@ TEST_F(KVEngineTestHarness, SimpleRS1) {
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
ASSERT(engine); ASSERT(engine);
std::string ns = "a.b"; std::unique_ptr<RecordStore> rs = newRecordStore(engine);
std::unique_ptr<RecordStore> rs; const auto& ident = rs->getIdent();
{
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
RecordId loc; RecordId loc;
{ {
auto opCtx = _makeOperationContext(engine); auto opCtx = _makeOperationContext(engine);
@ -224,7 +258,7 @@ TEST_F(KVEngineTestHarness, SimpleRS1) {
// This includes the _mdb_catalog. // This includes the _mdb_catalog.
ASSERT_EQUALS(2U, all.size()); ASSERT_EQUALS(2U, all.size());
ASSERT_EQUALS(ns, all[1]); ASSERT_EQUALS(ident, all[1]);
} }
} }
@ -233,22 +267,12 @@ TEST_F(KVEngineTestHarness, Restart1) {
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
ASSERT(engine); ASSERT(engine);
std::string ns = "a.b";
// 'loc' holds location of "abc" and is referenced after restarting engine. // 'loc' holds location of "abc" and is referenced after restarting engine.
RecordId loc; RecordId loc;
{ {
std::unique_ptr<RecordStore> rs; // Using default RecordStore parameters before and after restart.
{ std::unique_ptr<RecordStore> rs =
auto opCtx = _makeOperationContext(engine); newRecordStore(engine, kNss, kIdent, kRecordStoreOptions, kUUID);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
{ {
auto opCtx = _makeOperationContext(engine); auto opCtx = _makeOperationContext(engine);
@ -272,39 +296,15 @@ TEST_F(KVEngineTestHarness, Restart1) {
std::unique_ptr<RecordStore> rs; std::unique_ptr<RecordStore> rs;
auto opCtx = _makeOperationContext(engine); auto opCtx = _makeOperationContext(engine);
Lock::GlobalLock globalLock(opCtx.get(), MODE_S); Lock::GlobalLock globalLock(opCtx.get(), MODE_S);
rs = engine->getRecordStore(opCtx.get(), rs = engine->getRecordStore(opCtx.get(), kNss, kIdent, kRecordStoreOptions, kUUID);
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT_EQUALS(std::string("abc"), rs->dataFor(opCtx.get(), loc).data()); ASSERT_EQUALS(std::string("abc"), rs->dataFor(opCtx.get(), loc).data());
} }
} }
TEST_F(KVEngineTestHarness, SimpleSorted1) { TEST_F(KVEngineTestHarness, SimpleSorted1) {
std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext())); std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext()));
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
ASSERT(engine); ASSERT(engine);
std::string ident = "abc";
NamespaceString nss = NamespaceString::createNamespaceString_forTest("mydb.mycoll");
CollectionOptions options;
options.uuid = UUID::gen();
auto mdPtr = std::make_shared<BSONCollectionCatalogEntry::MetaData>();
mdPtr->nss = nss;
mdPtr->options = options;
std::unique_ptr<RecordStore> rs;
{
auto opCtx = _makeOperationContext(engine);
WriteUnitOfWork uow(opCtx.get());
ASSERT_OK(engine->createRecordStore(nss, "catalog"));
rs = engine->getRecordStore(opCtx.get(), nss, "catalog", options);
uow.commit();
}
std::string indexName = "name"; std::string indexName = "name";
auto spec = BSON("v" << static_cast<int>(IndexConfig::kLatestIndexVersion) << "key" auto spec = BSON("v" << static_cast<int>(IndexConfig::kLatestIndexVersion) << "key"
<< BSON("a" << 1) << "name" << indexName); << BSON("a" << 1) << "name" << indexName);
@ -320,14 +320,13 @@ TEST_F(KVEngineTestHarness, SimpleSorted1) {
auto opCtx = _makeOperationContext(engine); auto opCtx = _makeOperationContext(engine);
ASSERT_OK( ASSERT_OK(
engine->createSortedDataInterface(*shard_role_details::getRecoveryUnit(opCtx.get()), engine->createSortedDataInterface(*shard_role_details::getRecoveryUnit(opCtx.get()),
nss, kNss,
*options.uuid, kUUID,
ident, kIdent,
config, config,
options.indexOptionDefaults.getStorageEngine())); boost::none /* storageEngineIndexOptions */));
auto keyFormat = options.clusteredIndex.has_value() ? KeyFormat::String : KeyFormat::Long;
sorted = engine->getSortedDataInterface( sorted = engine->getSortedDataInterface(
opCtx.get(), nss, *options.uuid, ident, config, keyFormat); opCtx.get(), kNss, kUUID, kIdent, config, kRecordStoreOptions.keyFormat);
ASSERT(sorted); ASSERT(sorted);
} }
@ -399,50 +398,35 @@ TEST_F(KVEngineTestHarness, TemporaryRecordStoreSimple) {
TEST_F(KVEngineTestHarness, AllDurableTimestamp) { TEST_F(KVEngineTestHarness, AllDurableTimestamp) {
std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext())); std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext()));
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
std::unique_ptr<RecordStore> rs = newRecordStore(engine);
auto opCtxs = _makeOperationContexts(engine, 2);
std::string ns = "a.b"; Timestamp t51(5, 1);
std::unique_ptr<RecordStore> rs; Timestamp t52(5, 2);
{ Timestamp t61(6, 1);
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
{ ASSERT_EQ(engine->getAllDurableTimestamp(), Timestamp(StorageEngine::kMinimumTimestamp));
auto opCtxs = _makeOperationContexts(engine, 2);
Timestamp t51(5, 1); auto opCtx1 = opCtxs[0].second.get();
Timestamp t52(5, 2); WriteUnitOfWork uow1(opCtx1);
Timestamp t61(6, 1); ASSERT_OK(rs->insertRecord(opCtx1, "abc", 4, t51));
ASSERT_EQ(engine->getAllDurableTimestamp(), Timestamp(StorageEngine::kMinimumTimestamp)); ASSERT_EQ(engine->getAllDurableTimestamp(), Timestamp(StorageEngine::kMinimumTimestamp));
auto opCtx1 = opCtxs[0].second.get(); auto opCtx2 = opCtxs[1].second.get();
WriteUnitOfWork uow1(opCtx1); WriteUnitOfWork uow2(opCtx2);
ASSERT_OK(rs->insertRecord(opCtx1, "abc", 4, t51)); ASSERT_OK(rs->insertRecord(opCtx2, "abc", 4, t61));
uow2.commit();
ASSERT_EQ(engine->getAllDurableTimestamp(), Timestamp(StorageEngine::kMinimumTimestamp)); ASSERT_EQ(engine->getAllDurableTimestamp(), t51 - 1);
auto opCtx2 = opCtxs[1].second.get(); ASSERT_OK(rs->insertRecord(opCtx1, "abc", 4, t52));
WriteUnitOfWork uow2(opCtx2);
ASSERT_OK(rs->insertRecord(opCtx2, "abc", 4, t61));
uow2.commit();
ASSERT_EQ(engine->getAllDurableTimestamp(), t51 - 1); ASSERT_EQ(engine->getAllDurableTimestamp(), t51 - 1);
ASSERT_OK(rs->insertRecord(opCtx1, "abc", 4, t52)); uow1.commit();
ASSERT_EQ(engine->getAllDurableTimestamp(), t51 - 1); ASSERT_EQ(engine->getAllDurableTimestamp(), t61);
uow1.commit();
ASSERT_EQ(engine->getAllDurableTimestamp(), t61);
}
} }
/* /*
@ -466,20 +450,7 @@ TEST_F(KVEngineTestHarness, AllDurableTimestamp) {
TEST_F(KVEngineTestHarness, PinningOldestWithAnotherSession) { TEST_F(KVEngineTestHarness, PinningOldestWithAnotherSession) {
std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext())); std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext()));
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
std::unique_ptr<RecordStore> rs = newRecordStore(engine);
std::string ns = "a.b";
std::unique_ptr<RecordStore> rs;
{
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
auto opCtxs = _makeOperationContexts(engine, 2); auto opCtxs = _makeOperationContexts(engine, 2);
auto opCtx1 = opCtxs[0].second.get(); auto opCtx1 = opCtxs[0].second.get();
@ -541,64 +512,49 @@ TEST_F(KVEngineTestHarness, PinningOldestWithAnotherSession) {
TEST_F(KVEngineTestHarness, AllDurable) { TEST_F(KVEngineTestHarness, AllDurable) {
std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext())); std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext()));
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
std::unique_ptr<RecordStore> rs = newRecordStore(engine);
auto opCtxs = _makeOperationContexts(engine, 4);
std::string ns = "a.b"; const Timestamp kInsertTimestamp1 = Timestamp(10, 10);
std::unique_ptr<RecordStore> rs; const Timestamp kInsertTimestamp2 = Timestamp(20, 20);
{ const Timestamp kInsertTimestamp3 = Timestamp(30, 30);
auto opCtx = _makeOperationContext(engine); const Timestamp kInsertTimestamp4 = Timestamp(25, 25);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
{ auto opCtx1 = opCtxs[0].second.get();
auto opCtxs = _makeOperationContexts(engine, 4); WriteUnitOfWork uow1(opCtx1);
auto swRid = rs->insertRecord(opCtx1, "abc", 4, kInsertTimestamp1);
ASSERT_OK(swRid);
uow1.commit();
const Timestamp kInsertTimestamp1 = Timestamp(10, 10); ASSERT_EQ(engine->getAllDurableTimestamp(), kInsertTimestamp1);
const Timestamp kInsertTimestamp2 = Timestamp(20, 20);
const Timestamp kInsertTimestamp3 = Timestamp(30, 30);
const Timestamp kInsertTimestamp4 = Timestamp(25, 25);
auto opCtx1 = opCtxs[0].second.get(); auto opCtx2 = opCtxs[1].second.get();
WriteUnitOfWork uow1(opCtx1); WriteUnitOfWork uow2(opCtx2);
auto swRid = rs->insertRecord(opCtx1, "abc", 4, kInsertTimestamp1); swRid = rs->insertRecord(opCtx2, "abc", 4, kInsertTimestamp2);
ASSERT_OK(swRid); ASSERT_OK(swRid);
uow1.commit();
ASSERT_EQ(engine->getAllDurableTimestamp(), kInsertTimestamp1); ASSERT_EQ(engine->getAllDurableTimestamp(), kInsertTimestamp1);
auto opCtx2 = opCtxs[1].second.get(); auto opCtx3 = opCtxs[2].second.get();
WriteUnitOfWork uow2(opCtx2); WriteUnitOfWork uow3(opCtx3);
swRid = rs->insertRecord(opCtx2, "abc", 4, kInsertTimestamp2); swRid = rs->insertRecord(opCtx3, "abc", 4, kInsertTimestamp3);
ASSERT_OK(swRid); ASSERT_OK(swRid);
uow3.commit();
ASSERT_EQ(engine->getAllDurableTimestamp(), kInsertTimestamp1); ASSERT_EQ(engine->getAllDurableTimestamp(), kInsertTimestamp2 - 1);
auto opCtx3 = opCtxs[2].second.get(); uow2.commit();
WriteUnitOfWork uow3(opCtx3);
swRid = rs->insertRecord(opCtx3, "abc", 4, kInsertTimestamp3);
ASSERT_OK(swRid);
uow3.commit();
ASSERT_EQ(engine->getAllDurableTimestamp(), kInsertTimestamp2 - 1); ASSERT_EQ(engine->getAllDurableTimestamp(), kInsertTimestamp3);
uow2.commit(); auto opCtx4 = opCtxs[3].second.get();
WriteUnitOfWork uow4(opCtx4);
swRid = rs->insertRecord(opCtx4, "abc", 4, kInsertTimestamp4);
ASSERT_OK(swRid);
ASSERT_EQ(engine->getAllDurableTimestamp(), kInsertTimestamp3); ASSERT_EQ(engine->getAllDurableTimestamp(), kInsertTimestamp4 - 1);
auto opCtx4 = opCtxs[3].second.get(); uow4.commit();
WriteUnitOfWork uow4(opCtx4);
swRid = rs->insertRecord(opCtx4, "abc", 4, kInsertTimestamp4);
ASSERT_OK(swRid);
ASSERT_EQ(engine->getAllDurableTimestamp(), kInsertTimestamp4 - 1);
uow4.commit();
}
} }
/* /*
@ -619,19 +575,7 @@ TEST_F(KVEngineTestHarness, AllDurable) {
TEST_F(KVEngineTestHarness, BasicTimestampSingle) { TEST_F(KVEngineTestHarness, BasicTimestampSingle) {
std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext())); std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext()));
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
std::unique_ptr<RecordStore> rs = newRecordStore(engine);
std::string ns = "a.b";
std::unique_ptr<RecordStore> rs;
{
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
const Timestamp kReadTimestamp = Timestamp(9, 9); const Timestamp kReadTimestamp = Timestamp(9, 9);
const Timestamp kInsertTimestamp = Timestamp(10, 10); const Timestamp kInsertTimestamp = Timestamp(10, 10);
@ -694,19 +638,7 @@ TEST_F(KVEngineTestHarness, BasicTimestampSingle) {
TEST_F(KVEngineTestHarness, BasicTimestampMultiple) { TEST_F(KVEngineTestHarness, BasicTimestampMultiple) {
std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext())); std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext()));
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
std::unique_ptr<RecordStore> rs = newRecordStore(engine);
std::string ns = "a.b";
std::unique_ptr<RecordStore> rs;
{
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
const Timestamp t10 = Timestamp(10, 10); const Timestamp t10 = Timestamp(10, 10);
const Timestamp t20 = Timestamp(20, 20); const Timestamp t20 = Timestamp(20, 20);
@ -758,19 +690,7 @@ TEST_F(KVEngineTestHarness, BasicTimestampMultiple) {
DEATH_TEST_REGEX_F(KVEngineTestHarness, SnapshotHidesVisibility, ".*item not found.*") { DEATH_TEST_REGEX_F(KVEngineTestHarness, SnapshotHidesVisibility, ".*item not found.*") {
std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext())); std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext()));
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
std::unique_ptr<RecordStore> rs = newRecordStore(engine);
std::string ns = "a.b";
std::unique_ptr<RecordStore> rs;
{
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
auto opCtxs = _makeOperationContexts(engine, 2); auto opCtxs = _makeOperationContexts(engine, 2);
@ -819,29 +739,18 @@ TEST_F(KVEngineTestHarness, SingleReadWithConflictWithOplog) {
std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext())); std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext()));
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
std::string ns = "a.b";
std::unique_ptr<RecordStore> collectionRs; std::unique_ptr<RecordStore> collectionRs;
std::unique_ptr<RecordStore> oplogRs; std::unique_ptr<RecordStore> oplogRs;
{ {
auto opCtx = _makeOperationContext(engine); collectionRs = newRecordStore(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
collectionRs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(collectionRs);
CollectionOptions options;
options.uuid = UUID::gen();
options.capped = true;
options.cappedSize = 10240;
options.cappedMaxDocs = -1;
RecordStore::Options oplogRecordStoreOptions;
oplogRecordStoreOptions.isCapped = true;
oplogRecordStoreOptions.isOplog = true;
oplogRecordStoreOptions.oplogMaxSize = 10240;
NamespaceString oplogNss = NamespaceString::createNamespaceString_forTest("local.oplog.rs"); NamespaceString oplogNss = NamespaceString::createNamespaceString_forTest("local.oplog.rs");
ASSERT_OK(engine->createRecordStore(oplogNss, "ident")); oplogRs =
oplogRs = engine->getRecordStore(opCtx.get(), oplogNss, "ident", options); newRecordStore(engine, oplogNss, "oplogIdent", oplogRecordStoreOptions, UUID::gen());
ASSERT(oplogRs);
} }
RecordData rd; RecordData rd;
@ -900,21 +809,9 @@ TEST_F(KVEngineTestHarness, SingleReadWithConflictWithOplog) {
TEST_F(KVEngineTestHarness, PinningOldestTimestampWithReadConflict) { TEST_F(KVEngineTestHarness, PinningOldestTimestampWithReadConflict) {
std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext())); std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext()));
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
std::unique_ptr<RecordStore> rs = newRecordStore(engine);
std::string ns = "a.b";
std::unique_ptr<RecordStore> rs;
{
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
auto opCtx = _makeOperationContext(engine); auto opCtx = _makeOperationContext(engine);
Lock::GlobalLock globalLk(opCtx.get(), MODE_X); Lock::GlobalLock globalLk(opCtx.get(), MODE_X);
WriteUnitOfWork uow(opCtx.get()); WriteUnitOfWork uow(opCtx.get());
StatusWith<RecordId> res = rs->insertRecord(opCtx.get(), "abc", 4, Timestamp(10, 10)); StatusWith<RecordId> res = rs->insertRecord(opCtx.get(), "abc", 4, Timestamp(10, 10));
@ -948,19 +845,7 @@ DEATH_TEST_REGEX_F(KVEngineTestHarness,
"Fatal assertion.*39001") { "Fatal assertion.*39001") {
std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext())); std::unique_ptr<KVHarnessHelper> helper(KVHarnessHelper::create(getServiceContext()));
KVEngine* engine = helper->getEngine(); KVEngine* engine = helper->getEngine();
std::unique_ptr<RecordStore> rs = newRecordStore(engine);
std::string ns = "a.b";
std::unique_ptr<RecordStore> rs;
{
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
{ {
// A write transaction cannot insert records before the oldest timestamp. // A write transaction cannot insert records before the oldest timestamp.
@ -993,18 +878,7 @@ TEST_F(KVEngineTestHarness, RollingBackToLastStable) {
// The initial data timestamp has to be set to take stable checkpoints. // The initial data timestamp has to be set to take stable checkpoints.
engine->setInitialDataTimestamp(Timestamp(1, 1)); engine->setInitialDataTimestamp(Timestamp(1, 1));
std::string ns = "a.b"; std::unique_ptr<RecordStore> rs = newRecordStore(engine);
std::unique_ptr<RecordStore> rs;
{
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
RecordId ridA; RecordId ridA;
{ {
@ -1077,18 +951,7 @@ DEATH_TEST_REGEX_F(KVEngineTestHarness, CommitBehindStable, "Fatal assertion.*39
// The initial data timestamp has to be set to take stable checkpoints. // The initial data timestamp has to be set to take stable checkpoints.
engine->setInitialDataTimestamp(Timestamp(1, 1)); engine->setInitialDataTimestamp(Timestamp(1, 1));
std::string ns = "a.b"; std::unique_ptr<RecordStore> rs = newRecordStore(engine);
std::unique_ptr<RecordStore> rs;
{
auto opCtx = _makeOperationContext(engine);
ASSERT_OK(
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns));
rs = engine->getRecordStore(opCtx.get(),
NamespaceString::createNamespaceString_forTest(ns),
ns,
CollectionOptions());
ASSERT(rs);
}
{ {
// Set the stable timestamp to (2, 2). // Set the stable timestamp to (2, 2).
@ -1112,33 +975,17 @@ DEATH_TEST_REGEX_F(KVEngineTestHarness, CommitBehindStable, "Fatal assertion.*39
} }
TEST_F(DurableCatalogTest, Coll1) { TEST_F(DurableCatalogTest, Coll1) {
KVEngine* engine = helper->getEngine(); std::unique_ptr<RecordStore> catalogRS = createCatalogRS();
std::unique_ptr<DurableCatalog> catalog = createDurableCatalog(catalogRS.get());
std::unique_ptr<RecordStore> rs;
std::unique_ptr<DurableCatalog> catalog;
{
auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx);
ASSERT_OK(engine->createRecordStore(
NamespaceString::createNamespaceString_forTest("catalog"), "catalog"));
rs = engine->getRecordStore(opCtx,
NamespaceString::createNamespaceString_forTest("catalog"),
"catalog",
CollectionOptions());
catalog = std::make_unique<DurableCatalog>(rs.get(), false, false, nullptr);
uow.commit();
}
RecordId catalogId; RecordId catalogId;
{ {
auto clientAndCtx = makeClientAndCtx("opCtx"); auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx(); auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx); WriteUnitOfWork uow(opCtx);
catalogId = newCollection(opCtx, catalogId = addNewCatalogEntry(opCtx,
NamespaceString::createNamespaceString_forTest("a.b"), NamespaceString::createNamespaceString_forTest("a.b"),
CollectionOptions(), CollectionOptions(),
catalog.get()); catalog.get());
ASSERT_NOT_EQUALS("a.b", catalog->getEntry(catalogId).ident); ASSERT_NOT_EQUALS("a.b", catalog->getEntry(catalogId).ident);
uow.commit(); uow.commit();
} }
@ -1150,7 +997,7 @@ TEST_F(DurableCatalogTest, Coll1) {
Lock::GlobalLock globalLk(opCtx, MODE_IX); Lock::GlobalLock globalLk(opCtx, MODE_IX);
WriteUnitOfWork uow(opCtx); WriteUnitOfWork uow(opCtx);
catalog = std::make_unique<DurableCatalog>(rs.get(), false, false, nullptr); catalog = createDurableCatalog(catalogRS.get());
catalog->init(opCtx); catalog->init(opCtx);
uow.commit(); uow.commit();
} }
@ -1162,43 +1009,28 @@ TEST_F(DurableCatalogTest, Coll1) {
auto opCtx = clientAndCtx.opCtx(); auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx); WriteUnitOfWork uow(opCtx);
dropCollection(opCtx, catalogId, catalog.get()).transitional_ignore(); dropCollection(opCtx, catalogId, catalog.get()).transitional_ignore();
newCatalogId = newCollection(opCtx, newCatalogId = addNewCatalogEntry(opCtx,
NamespaceString::createNamespaceString_forTest("a.b"), NamespaceString::createNamespaceString_forTest("a.b"),
CollectionOptions(), CollectionOptions(),
catalog.get()); catalog.get());
uow.commit(); uow.commit();
} }
ASSERT_NOT_EQUALS(ident, catalog->getEntry(newCatalogId).ident); ASSERT_NOT_EQUALS(ident, catalog->getEntry(newCatalogId).ident);
} }
TEST_F(DurableCatalogTest, Idx1) { TEST_F(DurableCatalogTest, Idx1) {
KVEngine* engine = helper->getEngine(); std::unique_ptr<RecordStore> catalogRS = createCatalogRS();
std::unique_ptr<DurableCatalog> catalog = createDurableCatalog(catalogRS.get());
std::unique_ptr<RecordStore> rs;
std::unique_ptr<DurableCatalog> catalog;
{
auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx);
ASSERT_OK(engine->createRecordStore(
NamespaceString::createNamespaceString_forTest("catalog"), "catalog"));
rs = engine->getRecordStore(opCtx,
NamespaceString::createNamespaceString_forTest("catalog"),
"catalog",
CollectionOptions());
catalog = std::make_unique<DurableCatalog>(rs.get(), false, false, nullptr);
uow.commit();
}
RecordId catalogId; RecordId catalogId;
{ {
auto clientAndCtx = makeClientAndCtx("opCtx"); auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx(); auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx); WriteUnitOfWork uow(opCtx);
catalogId = newCollection(opCtx, catalogId = addNewCatalogEntry(opCtx,
NamespaceString::createNamespaceString_forTest("a.b"), NamespaceString::createNamespaceString_forTest("a.b"),
CollectionOptions(), CollectionOptions(),
catalog.get()); catalog.get());
ASSERT_NOT_EQUALS("a.b", catalog->getEntry(catalogId).ident); ASSERT_NOT_EQUALS("a.b", catalog->getEntry(catalogId).ident);
ASSERT_TRUE(ident::isCollectionOrIndexIdent(catalog->getEntry(catalogId).ident)); ASSERT_TRUE(ident::isCollectionOrIndexIdent(catalog->getEntry(catalogId).ident));
uow.commit(); uow.commit();
@ -1262,33 +1094,21 @@ TEST_F(DurableCatalogTest, Idx1) {
} }
TEST_F(DurableCatalogTest, DirectoryPerDb1) { TEST_F(DurableCatalogTest, DirectoryPerDb1) {
KVEngine* engine = helper->getEngine(); const bool directoryPerDB = true;
const bool directoryForIndexes = false;
std::unique_ptr<RecordStore> rs; std::unique_ptr<RecordStore> catalogRS = createCatalogRS();
std::unique_ptr<DurableCatalog> catalog; std::unique_ptr<DurableCatalog> catalog =
{ createDurableCatalog(catalogRS.get(), directoryPerDB, directoryForIndexes);
auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx);
ASSERT_OK(engine->createRecordStore(
NamespaceString::createNamespaceString_forTest("catalog"), "catalog"));
rs = engine->getRecordStore(opCtx,
NamespaceString::createNamespaceString_forTest("catalog"),
"catalog",
CollectionOptions());
catalog = std::make_unique<DurableCatalog>(rs.get(), true, false, nullptr);
uow.commit();
}
RecordId catalogId; RecordId catalogId;
{ // collection { // collection
auto clientAndCtx = makeClientAndCtx("opCtx"); auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx(); auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx); WriteUnitOfWork uow(opCtx);
catalogId = newCollection(opCtx, catalogId = addNewCatalogEntry(opCtx,
NamespaceString::createNamespaceString_forTest("a.b"), NamespaceString::createNamespaceString_forTest("a.b"),
CollectionOptions(), CollectionOptions(),
catalog.get()); catalog.get());
ASSERT_STRING_CONTAINS(catalog->getEntry(catalogId).ident, "a/"); ASSERT_STRING_CONTAINS(catalog->getEntry(catalogId).ident, "a/");
ASSERT_TRUE(ident::isCollectionOrIndexIdent(catalog->getEntry(catalogId).ident)); ASSERT_TRUE(ident::isCollectionOrIndexIdent(catalog->getEntry(catalogId).ident));
uow.commit(); uow.commit();
@ -1316,33 +1136,21 @@ TEST_F(DurableCatalogTest, DirectoryPerDb1) {
} }
TEST_F(DurableCatalogTest, Split1) { TEST_F(DurableCatalogTest, Split1) {
KVEngine* engine = helper->getEngine(); const bool directoryPerDB = false;
const bool directoryForIndexes = true;
std::unique_ptr<RecordStore> rs; std::unique_ptr<RecordStore> catalogRS = createCatalogRS();
std::unique_ptr<DurableCatalog> catalog; std::unique_ptr<DurableCatalog> catalog =
{ createDurableCatalog(catalogRS.get(), directoryPerDB, directoryForIndexes);
auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx);
ASSERT_OK(engine->createRecordStore(
NamespaceString::createNamespaceString_forTest("catalog"), "catalog"));
rs = engine->getRecordStore(opCtx,
NamespaceString::createNamespaceString_forTest("catalog"),
"catalog",
CollectionOptions());
catalog = std::make_unique<DurableCatalog>(rs.get(), false, true, nullptr);
uow.commit();
}
RecordId catalogId; RecordId catalogId;
{ {
auto clientAndCtx = makeClientAndCtx("opCtx"); auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx(); auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx); WriteUnitOfWork uow(opCtx);
catalogId = newCollection(opCtx, catalogId = addNewCatalogEntry(opCtx,
NamespaceString::createNamespaceString_forTest("a.b"), NamespaceString::createNamespaceString_forTest("a.b"),
CollectionOptions(), CollectionOptions(),
catalog.get()); catalog.get());
ASSERT_STRING_CONTAINS(catalog->getEntry(catalogId).ident, "collection/"); ASSERT_STRING_CONTAINS(catalog->getEntry(catalogId).ident, "collection/");
ASSERT_TRUE(ident::isCollectionOrIndexIdent(catalog->getEntry(catalogId).ident)); ASSERT_TRUE(ident::isCollectionOrIndexIdent(catalog->getEntry(catalogId).ident));
uow.commit(); uow.commit();
@ -1370,36 +1178,21 @@ TEST_F(DurableCatalogTest, Split1) {
} }
TEST_F(DurableCatalogTest, DirectoryPerAndSplit1) { TEST_F(DurableCatalogTest, DirectoryPerAndSplit1) {
KVEngine* engine = helper->getEngine();
std::unique_ptr<RecordStore> rs;
std::unique_ptr<DurableCatalog> catalog;
const bool directoryPerDB = true; const bool directoryPerDB = true;
const bool directoryPerIndexes = true; const bool directoryPerIndexes = true;
{ std::unique_ptr<RecordStore> catalogRS = createCatalogRS();
auto clientAndCtx = makeClientAndCtx("opCtx"); std::unique_ptr<DurableCatalog> catalog =
auto opCtx = clientAndCtx.opCtx(); createDurableCatalog(catalogRS.get(), directoryPerDB, directoryPerIndexes);
WriteUnitOfWork uow(opCtx);
ASSERT_OK(engine->createRecordStore(
NamespaceString::createNamespaceString_forTest("catalog"), "catalog"));
rs = engine->getRecordStore(opCtx,
NamespaceString::createNamespaceString_forTest("catalog"),
"catalog",
CollectionOptions());
catalog = std::make_unique<DurableCatalog>(
rs.get(), directoryPerDB, directoryPerIndexes, nullptr);
uow.commit();
}
RecordId catalogId; RecordId catalogId;
{ {
auto clientAndCtx = makeClientAndCtx("opCtx"); auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx(); auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx); WriteUnitOfWork uow(opCtx);
catalogId = newCollection(opCtx, catalogId = addNewCatalogEntry(opCtx,
NamespaceString::createNamespaceString_forTest("a.b"), NamespaceString::createNamespaceString_forTest("a.b"),
CollectionOptions(), CollectionOptions(),
catalog.get()); catalog.get());
ASSERT_STRING_CONTAINS(catalog->getEntry(catalogId).ident, "a/collection/"); ASSERT_STRING_CONTAINS(catalog->getEntry(catalogId).ident, "a/collection/");
ASSERT_TRUE(ident::isCollectionOrIndexIdent(catalog->getEntry(catalogId).ident)); ASSERT_TRUE(ident::isCollectionOrIndexIdent(catalog->getEntry(catalogId).ident));
uow.commit(); uow.commit();
@ -1435,24 +1228,8 @@ TEST_F(DurableCatalogTest, BackupImplemented) {
TEST_F(DurableCatalogTest, EntryIncludesTenantIdInMultitenantEnv) { TEST_F(DurableCatalogTest, EntryIncludesTenantIdInMultitenantEnv) {
gMultitenancySupport = true; gMultitenancySupport = true;
KVEngine* engine = helper->getEngine(); std::unique_ptr<RecordStore> catalogRS = createCatalogRS();
std::unique_ptr<DurableCatalog> catalog = createDurableCatalog(catalogRS.get());
// Create a DurableCatalog and RecordStore
std::unique_ptr<RecordStore> rs;
std::unique_ptr<DurableCatalog> catalog;
{
auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx);
ASSERT_OK(engine->createRecordStore(
NamespaceString::createNamespaceString_forTest("catalog"), "catalog"));
rs = engine->getRecordStore(opCtx,
NamespaceString::createNamespaceString_forTest("catalog"),
"catalog",
CollectionOptions());
catalog = std::make_unique<DurableCatalog>(rs.get(), false, false, nullptr);
uow.commit();
}
// Insert an entry into the DurableCatalog, and ensure the tenantId is stored on the nss in the // Insert an entry into the DurableCatalog, and ensure the tenantId is stored on the nss in the
// entry. // entry.
@ -1463,7 +1240,7 @@ TEST_F(DurableCatalogTest, EntryIncludesTenantIdInMultitenantEnv) {
auto clientAndCtx = makeClientAndCtx("opCtx"); auto clientAndCtx = makeClientAndCtx("opCtx");
auto opCtx = clientAndCtx.opCtx(); auto opCtx = clientAndCtx.opCtx();
WriteUnitOfWork uow(opCtx); WriteUnitOfWork uow(opCtx);
catalogId = newCollection(opCtx, nss, CollectionOptions(), catalog.get()); catalogId = addNewCatalogEntry(opCtx, nss, CollectionOptions(), catalog.get());
uow.commit(); uow.commit();
} }
ASSERT_EQUALS(nss.tenantId(), catalog->getEntry(catalogId).nss.tenantId()); ASSERT_EQUALS(nss.tenantId(), catalog->getEntry(catalogId).nss.tenantId());
@ -1478,7 +1255,7 @@ TEST_F(DurableCatalogTest, EntryIncludesTenantIdInMultitenantEnv) {
Lock::GlobalLock globalLk(opCtx, MODE_IX); Lock::GlobalLock globalLk(opCtx, MODE_IX);
WriteUnitOfWork uow(opCtx); WriteUnitOfWork uow(opCtx);
catalog = std::make_unique<DurableCatalog>(rs.get(), false, false, nullptr); createDurableCatalog(catalogRS.get());
catalog->init(opCtx); catalog->init(opCtx);
uow.commit(); uow.commit();
} }

View File

@ -226,11 +226,11 @@ public:
auto op = makeOperation(); auto op = makeOperation();
WriteUnitOfWork wuow(op); WriteUnitOfWork wuow(op);
std::string ns = "a.b"; const auto nss = NamespaceString::createNamespaceString_forTest("a.b");
ASSERT_OK( const auto ident = "ident";
engine->createRecordStore(NamespaceString::createNamespaceString_forTest(ns), ns)); RecordStore::Options options;
rs = engine->getRecordStore( ASSERT_OK(engine->createRecordStore(nss, ident, options));
op, NamespaceString::createNamespaceString_forTest(ns), ns, CollectionOptions()); rs = engine->getRecordStore(op, nss, ident, options, UUID::gen());
ASSERT(rs); ASSERT(rs);
} }

View File

@ -319,6 +319,72 @@ public:
class Capped; class Capped;
class Oplog; class Oplog;
/**
* Options for generating a new RecordStore. Each RecordStore subclass is responsible for
* parsing its applicable fields - not all fields apply to every RecordStore implementation.
*/
struct Options {
/**
* The KeyFormat for RecordIds in the RecordStore.
*/
KeyFormat keyFormat{KeyFormat::Long};
/**
* True if the RecordStore is for a capped collection.
*/
bool isCapped{false};
/*
* True if the RecordStore is for the oplog collection.
*/
bool isOplog{false};
/**
* The initial maximum size an Oplog's RecordStore should reach. Non-zero value is only
* valid when 'isOplog' is true.
*/
long long oplogMaxSize{0};
/*
* True if the RecordStore is for a change collection.
*
* TODO SERVER-97046: Remove once no longer needed for ResourceConsumptionMetrics in
* 'WiredTigerRecordStore'.
*/
bool isChangeCollection{false};
/**
* Whether or not the RecordStore allows allows writes to overwrite existing records with
* the same RecordId.
*/
bool allowOverwrite{true};
/**
* True if updates through the RecordStore must force updates to the full document.
*/
bool forceUpdateWithFullDocument{false};
/**
* When not none, defines a block compression algorithm to use in liu of the default for
* RecordStores which support block compression. Otherwise, the RecordStore should utilize
* the default block compressor.
*/
boost::optional<std::string> customBlockCompressor;
/**
* Empty by default. Holds collection-specific storage engine configuration options. For
* example, the 'storageEngine' options passed into `db.createCollection()`. Expected to be
* mirror the 'CollectionOptions::storageEngine' format { storageEngine: { <storage engine
* name> : { configString: "<option>=<setting>,..."} } }.
*
* If fields in the 'configString' conflict with fields set either by global defaults or
* other members of the 'RecordStore::Options' struct, RecordStores should prefer values
* from the 'configString'. However, this is difficult to guarantee across RecordStores, and
* any concerns should be validated through explicit testing.
*/
BSONObj storageEngineCollectionOptions;
};
virtual ~RecordStore() {} virtual ~RecordStore() {}
virtual const char* name() const = 0; virtual const char* name() const = 0;

View File

@ -58,7 +58,7 @@ struct Fixture {
Fixture(Direction direction, int nToInsert, bool capped = false) Fixture(Direction direction, int nToInsert, bool capped = false)
: nToInsert(nToInsert), : nToInsert(nToInsert),
harness(newRecordStoreHarnessHelper()), harness(newRecordStoreHarnessHelper()),
rs(harness->newRecordStore("ns", CollectionOptions{.capped = capped})), rs(harness->newRecordStore("ns", RecordStore::Options{.isCapped = capped})),
opCtx(harness->newOperationContext()), opCtx(harness->newOperationContext()),
cursor(rs->getCursor(opCtx.get(), direction == kForward)) { cursor(rs->getCursor(opCtx.get(), direction == kForward)) {
char data[] = "data"; char data[] = "data";

View File

@ -41,8 +41,6 @@
#include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/oid.h" #include "mongo/bson/oid.h"
#include "mongo/db/catalog/clustered_collection_options_gen.h"
#include "mongo/db/catalog/clustered_collection_util.h"
#include "mongo/db/record_id.h" #include "mongo/db/record_id.h"
#include "mongo/db/record_id_helpers.h" #include "mongo/db/record_id_helpers.h"
#include "mongo/db/service_context.h" #include "mongo/db/service_context.h"
@ -621,29 +619,26 @@ TEST(RecordStoreTest, CursorSaveUnpositionedRestoreSeek) {
TEST(RecordStoreTest, ClusteredRecordStore) { TEST(RecordStoreTest, ClusteredRecordStore) {
const auto harnessHelper = newRecordStoreHarnessHelper(); const auto harnessHelper = newRecordStoreHarnessHelper();
const std::string ns = "test.system.buckets.a"; const std::string ns = "testDB.clusteredColl";
CollectionOptions options; RecordStore::Options rsOptions = harnessHelper->clusteredRecordStoreOptions();
options.clusteredIndex = clustered_util::makeCanonicalClusteredInfoForLegacyFormat(); std::unique_ptr<RecordStore> rs = harnessHelper->newRecordStore(ns, rsOptions);
std::unique_ptr<RecordStore> rs = harnessHelper->newRecordStore(ns, options);
invariant(rs->keyFormat() == KeyFormat::String); invariant(rs->keyFormat() == KeyFormat::String);
auto opCtx = harnessHelper->newOperationContext();
auto& ru = *shard_role_details::getRecoveryUnit(opCtx.get());
const int numRecords = 100; const int numRecords = 100;
std::vector<Record> records; std::vector<Record> records;
std::vector<Timestamp> timestamps(numRecords, Timestamp()); std::vector<Timestamp> timestamps(numRecords, Timestamp());
for (int i = 0; i < numRecords; i++) { for (int i = 0; i < numRecords; i++) {
BSONObj doc = BSON("_id" << OID::gen() << "i" << i); auto oid = OID::gen();
BSONObj doc = BSON("_id" << oid << "i" << i);
RecordData recordData = RecordData(doc.objdata(), doc.objsize()); RecordData recordData = RecordData(doc.objdata(), doc.objsize());
recordData.makeOwned(); recordData.makeOwned();
auto rid = record_id_helpers::keyForOID(oid);
RecordId id = uassertStatusOK( records.push_back({rid, recordData});
record_id_helpers::keyForDoc(doc, options.clusteredIndex->getIndexSpec(), nullptr));
records.push_back({id, recordData});
} }
auto opCtx = harnessHelper->newOperationContext();
auto& ru = *shard_role_details::getRecoveryUnit(opCtx.get());
{ {
StorageWriteTransaction txn(ru); StorageWriteTransaction txn(ru);
ASSERT_OK(rs->insertRecords(opCtx.get(), &records, timestamps)); ASSERT_OK(rs->insertRecords(opCtx.get(), &records, timestamps));
@ -728,21 +723,18 @@ TEST(RecordStoreTest, ClusteredRecordStore) {
TEST(RecordStoreTest, ClusteredCappedRecordStoreCreation) { TEST(RecordStoreTest, ClusteredCappedRecordStoreCreation) {
const auto harnessHelper = newRecordStoreHarnessHelper(); const auto harnessHelper = newRecordStoreHarnessHelper();
const std::string ns = "config.changes.c"; const std::string ns = "config.changes.c";
CollectionOptions options; RecordStore::Options rsOptions = harnessHelper->clusteredRecordStoreOptions();
options.clusteredIndex = clustered_util::makeDefaultClusteredIdIndex(); rsOptions.isCapped = true;
options.expireAfterSeconds = 1; std::unique_ptr<RecordStore> rs = harnessHelper->newRecordStore(ns, rsOptions);
options.capped = true;
std::unique_ptr<RecordStore> rs = harnessHelper->newRecordStore(ns, options);
invariant(rs->keyFormat() == KeyFormat::String); invariant(rs->keyFormat() == KeyFormat::String);
} }
TEST(RecordStoreTest, ClusteredCappedRecordStoreSeek) { TEST(RecordStoreTest, ClusteredCappedRecordStoreSeek) {
const auto harnessHelper = newRecordStoreHarnessHelper(); const auto harnessHelper = newRecordStoreHarnessHelper();
const std::string ns = "test.system.buckets.a"; RecordStore::Options rsOptions = harnessHelper->clusteredRecordStoreOptions();
CollectionOptions options; rsOptions.isCapped = true;
options.capped = true; const std::string ns = "test.clusteredCappedColl";
options.clusteredIndex = clustered_util::makeCanonicalClusteredInfoForLegacyFormat(); std::unique_ptr<RecordStore> rs = harnessHelper->newRecordStore(ns, rsOptions);
std::unique_ptr<RecordStore> rs = harnessHelper->newRecordStore(ns, options);
invariant(rs->keyFormat() == KeyFormat::String); invariant(rs->keyFormat() == KeyFormat::String);
auto opCtx = harnessHelper->newOperationContext(); auto opCtx = harnessHelper->newOperationContext();

View File

@ -34,7 +34,6 @@
#include <string> #include <string>
#include "mongo/bson/timestamp.h" #include "mongo/bson/timestamp.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/storage/kv/kv_engine.h" #include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/db/storage/record_store.h" #include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/storage_engine.h"
@ -49,16 +48,29 @@ public:
virtual std::unique_ptr<RecordStore> newRecordStore() = 0; virtual std::unique_ptr<RecordStore> newRecordStore() = 0;
std::unique_ptr<RecordStore> newRecordStore(const std::string& ns) { std::unique_ptr<RecordStore> newRecordStore(const std::string& ns) {
return newRecordStore(ns, CollectionOptions()); return newRecordStore(ns, RecordStore::Options{});
} }
virtual std::unique_ptr<RecordStore> newRecordStore(const std::string& ns, virtual std::unique_ptr<RecordStore> newRecordStore(const std::string& ns,
const CollectionOptions& options) = 0; const RecordStore::Options& rsOptions) = 0;
virtual std::unique_ptr<RecordStore> newOplogRecordStore() = 0; virtual std::unique_ptr<RecordStore> newOplogRecordStore() = 0;
virtual KVEngine* getEngine() = 0; virtual KVEngine* getEngine() = 0;
/**
* For test convenience only - in general, the notion of a 'clustered' collection should exist
* above the storage layer.
*
* Returns RecordStore::Options for a 'clustered' collection.
*/
RecordStore::Options clusteredRecordStoreOptions() {
RecordStore::Options clusteredRSOptions;
clusteredRSOptions.keyFormat = KeyFormat::String;
clusteredRSOptions.allowOverwrite = true;
return clusteredRSOptions;
}
/** /**
* Advances the stable timestamp of the engine. * Advances the stable timestamp of the engine.
*/ */

View File

@ -44,6 +44,7 @@
#include "mongo/db/admission/execution_admission_context.h" #include "mongo/db/admission/execution_admission_context.h"
#include "mongo/db/catalog/clustered_collection_util.h" #include "mongo/db/catalog/clustered_collection_util.h"
#include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog/collection_record_store_options.h"
#include "mongo/db/catalog_raii.h" #include "mongo/db/catalog_raii.h"
#include "mongo/db/client.h" #include "mongo/db/client.h"
#include "mongo/db/index/multikey_paths.h" #include "mongo/db/index/multikey_paths.h"
@ -174,10 +175,14 @@ void StorageEngineImpl::loadDurableCatalog(OperationContext* opCtx,
} }
} }
// The '_mdb_' catalog is generated and retrieved with a default 'RecordStore' configuration.
// This maintains current and earlier behavior of a MongoD.
const auto catalogRecordStoreOpts = RecordStore::Options{};
if (!catalogExists) { if (!catalogExists) {
WriteUnitOfWork uow(opCtx); WriteUnitOfWork uow(opCtx);
auto status = _engine->createRecordStore(kCatalogInfoNamespace, kCatalogInfo); auto status =
_engine->createRecordStore(kCatalogInfoNamespace, kCatalogInfo, catalogRecordStoreOpts);
// BadValue is usually caused by invalid configuration string. // BadValue is usually caused by invalid configuration string.
// We still fassert() but without a stack trace. // We still fassert() but without a stack trace.
@ -188,8 +193,9 @@ void StorageEngineImpl::loadDurableCatalog(OperationContext* opCtx,
uow.commit(); uow.commit();
} }
_catalogRecordStore = _catalogRecordStore = _engine->getRecordStore(
_engine->getRecordStore(opCtx, kCatalogInfoNamespace, kCatalogInfo, CollectionOptions()); opCtx, kCatalogInfoNamespace, kCatalogInfo, catalogRecordStoreOpts, boost::none /* uuid */);
if (shouldLog(::mongo::logv2::LogComponent::kStorageRecovery, kCatalogLogLevel)) { if (shouldLog(::mongo::logv2::LogComponent::kStorageRecovery, kCatalogLogLevel)) {
LOGV2_FOR_RECOVERY(4615631, kCatalogLogLevel.toInt(), "loadDurableCatalog:"); LOGV2_FOR_RECOVERY(4615631, kCatalogLogLevel.toInt(), "loadDurableCatalog:");
_dumpCatalog(opCtx); _dumpCatalog(opCtx);
@ -423,13 +429,9 @@ Status StorageEngineImpl::_recoverOrphanedCollection(OperationContext* opCtx,
WriteUnitOfWork wuow(opCtx); WriteUnitOfWork wuow(opCtx);
const auto catalogEntry = _catalog->getParsedCatalogEntry(opCtx, catalogId); const auto catalogEntry = _catalog->getParsedCatalogEntry(opCtx, catalogId);
const auto md = catalogEntry->metadata; const auto md = catalogEntry->metadata;
const auto options = md->options; const auto recordStoreOptions = getRecordStoreOptions(collectionName, md->options);
const auto keyFormat = options.clusteredIndex ? KeyFormat::String : KeyFormat::Long; Status status =
Status status = _engine->recoverOrphanedIdent(collectionName, _engine->recoverOrphanedIdent(collectionName, collectionIdent, recordStoreOptions);
collectionIdent,
keyFormat,
options.timeseries.has_value(),
options.storageEngine);
bool dataModified = status.code() == ErrorCodes::DataModifiedByRepair; bool dataModified = status.code() == ErrorCodes::DataModifiedByRepair;
if (!status.isOK() && !dataModified) { if (!status.isOK() && !dataModified) {
@ -491,7 +493,8 @@ bool StorageEngineImpl::_handleInternalIdent(OperationContext* opCtx,
// When starting up after a clean shutdown and resumable index builds are supported, find the // When starting up after a clean shutdown and resumable index builds are supported, find the
// internal idents that contain the relevant information to resume each index build and recover // internal idents that contain the relevant information to resume each index build and recover
// the state. // the state.
auto rs = _engine->getRecordStore(opCtx, NamespaceString::kEmpty, ident, CollectionOptions()); auto rs = _engine->getRecordStore(
opCtx, NamespaceString::kEmpty, ident, RecordStore::Options{}, boost::none /* uuid */);
auto cursor = rs->getCursor(opCtx); auto cursor = rs->getCursor(opCtx);
auto record = cursor->next(); auto record = cursor->next();

View File

@ -118,7 +118,8 @@ public:
*/ */
Status createCollTable(OperationContext* opCtx, NamespaceString collName) { Status createCollTable(OperationContext* opCtx, NamespaceString collName) {
const std::string identName = "collection-" + collName.ns_forTest(); const std::string identName = "collection-" + collName.ns_forTest();
return _storageEngine->getEngine()->createRecordStore(collName, identName); return _storageEngine->getEngine()->createRecordStore(
collName, identName, RecordStore::Options{});
} }
Status dropIndexTable(OperationContext* opCtx, NamespaceString nss, std::string indexName) { Status dropIndexTable(OperationContext* opCtx, NamespaceString nss, std::string indexName) {

View File

@ -87,7 +87,8 @@ public:
std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx, std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx,
const NamespaceString& nss, const NamespaceString& nss,
StringData ident, StringData ident,
const CollectionOptions& options) override { const RecordStore::Options& options,
boost::optional<UUID> uuid) override {
MONGO_UNREACHABLE; MONGO_UNREACHABLE;
} }
@ -102,9 +103,7 @@ public:
Status createRecordStore(const NamespaceString& nss, Status createRecordStore(const NamespaceString& nss,
StringData ident, StringData ident,
KeyFormat keyFormat = KeyFormat::Long, const RecordStore::Options& options) override {
bool isTimeseries = false,
const BSONObj& storageEngineCollectionOptions = BSONObj()) override {
MONGO_UNREACHABLE; MONGO_UNREACHABLE;
} }

View File

@ -40,8 +40,6 @@ namespace mongo {
class WiredTigerGlobalOptions { class WiredTigerGlobalOptions {
public: public:
static constexpr auto kDefaultTimeseriesCollectionCompressor = "zstd"_sd;
WiredTigerGlobalOptions() WiredTigerGlobalOptions()
: cacheSizeGB(0), : cacheSizeGB(0),
statisticsLogDelaySecs(0), statisticsLogDelaySecs(0),

View File

@ -1538,11 +1538,11 @@ void WiredTigerKVEngine::setSortedDataInterfaceExtraOptions(const std::string& o
_indexOptions = options; _indexOptions = options;
} }
Status WiredTigerKVEngine::createRecordStore(const NamespaceString& nss, Status WiredTigerKVEngine::_createRecordStore(const NamespaceString& nss,
StringData ident, StringData ident,
KeyFormat keyFormat, KeyFormat keyFormat,
bool isTimeseries, const BSONObj& storageEngineCollectionOptions,
const BSONObj& storageEngineCollectionOptions) { boost::optional<std::string> customBlockCompressor) {
WiredTigerSession session(_connection.get()); WiredTigerSession session(_connection.get());
WiredTigerRecordStoreBase::WiredTigerTableConfig wtTableConfig = WiredTigerRecordStoreBase::WiredTigerTableConfig wtTableConfig =
@ -1555,11 +1555,8 @@ Status WiredTigerKVEngine::createRecordStore(const NamespaceString& nss,
wtTableConfig.logEnabled = wtTableConfig.logEnabled =
WiredTigerUtil::useTableLogging(nss, isReplSet, shouldRecoverFromOplogAsStandalone); WiredTigerUtil::useTableLogging(nss, isReplSet, shouldRecoverFromOplogAsStandalone);
if (isTimeseries) { if (customBlockCompressor) {
// Time-series collections use zstd compression by default while all other collections use wtTableConfig.blockCompressor = *customBlockCompressor;
// the globally configured default.
wtTableConfig.blockCompressor =
WiredTigerGlobalOptions::kDefaultTimeseriesCollectionCompressor.toString();
} }
auto customConfigString = WiredTigerRecordStore::parseOptionsField( auto customConfigString = WiredTigerRecordStore::parseOptionsField(
@ -1611,9 +1608,7 @@ Status WiredTigerKVEngine::importRecordStore(StringData ident,
Status WiredTigerKVEngine::recoverOrphanedIdent(const NamespaceString& nss, Status WiredTigerKVEngine::recoverOrphanedIdent(const NamespaceString& nss,
StringData ident, StringData ident,
KeyFormat keyFormat, const RecordStore::Options& options) {
bool isTimeseries,
const BSONObj& storageEngineCollectionOptions) {
#ifdef _WIN32 #ifdef _WIN32
return {ErrorCodes::CommandNotSupported, "Orphan file recovery is not supported on Windows"}; return {ErrorCodes::CommandNotSupported, "Orphan file recovery is not supported on Windows"};
#else #else
@ -1645,7 +1640,7 @@ Status WiredTigerKVEngine::recoverOrphanedIdent(const NamespaceString& nss,
LOGV2(22333, "Creating new RecordStore", logAttrs(nss)); LOGV2(22333, "Creating new RecordStore", logAttrs(nss));
status = createRecordStore(nss, ident, keyFormat, isTimeseries, storageEngineCollectionOptions); status = createRecordStore(nss, ident, options);
if (!status.isOK()) { if (!status.isOK()) {
return status; return status;
} }
@ -1691,21 +1686,22 @@ Status WiredTigerKVEngine::recoverOrphanedIdent(const NamespaceString& nss,
std::unique_ptr<RecordStore> WiredTigerKVEngine::getRecordStore(OperationContext* opCtx, std::unique_ptr<RecordStore> WiredTigerKVEngine::getRecordStore(OperationContext* opCtx,
const NamespaceString& nss, const NamespaceString& nss,
StringData ident, StringData ident,
const CollectionOptions& options) { const RecordStore::Options& options,
boost::optional<UUID> uuid) {
std::unique_ptr<WiredTigerRecordStore> ret; std::unique_ptr<WiredTigerRecordStore> ret;
if (nss.isOplog()) { if (options.isOplog) {
ret = std::make_unique<WiredTigerRecordStore::Oplog>( ret = std::make_unique<WiredTigerRecordStore::Oplog>(
this, this,
WiredTigerRecoveryUnit::get(*shard_role_details::getRecoveryUnit(opCtx)), WiredTigerRecoveryUnit::get(*shard_role_details::getRecoveryUnit(opCtx)),
WiredTigerRecordStore::Oplog::Params{.uuid = *options.uuid, WiredTigerRecordStore::Oplog::Params{.uuid = *uuid,
.ident = ident.toString(), .ident = ident.toString(),
.engineName = _canonicalName, .engineName = _canonicalName,
.inMemory = _wtConfig.inMemory, .inMemory = _wtConfig.inMemory,
.oplogMaxSize = options.cappedSize, .oplogMaxSize = options.oplogMaxSize,
.sizeStorer = _sizeStorer.get(), .sizeStorer = _sizeStorer.get(),
.tracksSizeAdjustments = true, .tracksSizeAdjustments = true,
.forceUpdateWithFullDocument = .forceUpdateWithFullDocument =
options.timeseries.has_value()}); options.forceUpdateWithFullDocument});
// If the server was started in read-only mode or if we are restoring the node, skip // If the server was started in read-only mode or if we are restoring the node, skip
// calculating the oplog truncate markers. The OplogCapMaintainerThread does not get started // calculating the oplog truncate markers. The OplogCapMaintainerThread does not get started
@ -1730,21 +1726,21 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::getRecordStore(OperationContext
return !isReplSet && !shouldRecoverFromOplogAsStandalone; return !isReplSet && !shouldRecoverFromOplogAsStandalone;
}(); }();
WiredTigerRecordStore::Params params{ WiredTigerRecordStore::Params params{
.baseParams{.uuid = options.uuid, .baseParams{.uuid = uuid,
.ident = ident.toString(), .ident = ident.toString(),
.engineName = _canonicalName, .engineName = _canonicalName,
.keyFormat = options.clusteredIndex ? KeyFormat::String : KeyFormat::Long, .keyFormat = options.keyFormat,
.overwrite = !options.clusteredIndex, .overwrite = options.allowOverwrite,
.isLogged = isLogged, .isLogged = isLogged,
.forceUpdateWithFullDocument = options.timeseries.has_value()}, .forceUpdateWithFullDocument = options.forceUpdateWithFullDocument},
// Record stores for clustered collections need to guarantee uniqueness by preventing // Record stores for clustered collections need to guarantee uniqueness by preventing
// overwrites. // overwrites.
.inMemory = _wtConfig.inMemory, .inMemory = _wtConfig.inMemory,
.isChangeCollection = nss.isChangeCollection(), .isChangeCollection = options.isChangeCollection,
.sizeStorer = _sizeStorer.get(), .sizeStorer = _sizeStorer.get(),
.tracksSizeAdjustments = true}; .tracksSizeAdjustments = true};
ret = options.capped ret = options.isCapped
? std::make_unique<WiredTigerRecordStore::Capped>( ? std::make_unique<WiredTigerRecordStore::Capped>(
this, this,
WiredTigerRecoveryUnit::get(*shard_role_details::getRecoveryUnit(opCtx)), WiredTigerRecoveryUnit::get(*shard_role_details::getRecoveryUnit(opCtx)),

View File

@ -47,7 +47,6 @@
#include "mongo/base/string_data.h" #include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h" #include "mongo/bson/timestamp.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h" #include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h" #include "mongo/db/operation_context.h"
#include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/journal_listener.h"
@ -342,14 +341,20 @@ public:
Status createRecordStore(const NamespaceString& ns, Status createRecordStore(const NamespaceString& ns,
StringData ident, StringData ident,
KeyFormat keyFormat = KeyFormat::Long, const RecordStore::Options& options) override {
bool isTimeseries = false, // Parameters required for a standard WiredTigerRecordStore.
const BSONObj& storageEngineCollectionOptions = BSONObj()) override; return _createRecordStore(ns,
ident,
options.keyFormat,
options.storageEngineCollectionOptions,
options.customBlockCompressor);
}
std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx, std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx,
const NamespaceString& nss, const NamespaceString& nss,
StringData ident, StringData ident,
const CollectionOptions& options) override; const RecordStore::Options& options,
boost::optional<UUID> uuid) override;
std::unique_ptr<RecordStore> getTemporaryRecordStore(OperationContext* opCtx, std::unique_ptr<RecordStore> getTemporaryRecordStore(OperationContext* opCtx,
StringData ident, StringData ident,
@ -437,9 +442,7 @@ public:
Status recoverOrphanedIdent(const NamespaceString& nss, Status recoverOrphanedIdent(const NamespaceString& nss,
StringData ident, StringData ident,
KeyFormat keyFormat = KeyFormat::Long, const RecordStore::Options& options) override;
bool isTimeseries = false,
const BSONObj& storageEngineCollectionOptions = BSONObj()) override;
bool hasIdent(RecoveryUnit&, StringData ident) const override; bool hasIdent(RecoveryUnit&, StringData ident) const override;
@ -678,6 +681,12 @@ private:
StorageEngine::DropIdentCallback callback; StorageEngine::DropIdentCallback callback;
}; };
Status _createRecordStore(const NamespaceString& ns,
StringData ident,
KeyFormat keyFormat,
const BSONObj& storageEngineCollectionOptions,
boost::optional<std::string> customBlockCompressor);
void _checkpoint(WiredTigerSession& session); void _checkpoint(WiredTigerSession& session);
void _checkpoint(WiredTigerSession& session, bool useTimestamp); void _checkpoint(WiredTigerSession& session, bool useTimestamp);

View File

@ -168,18 +168,16 @@ TEST_F(WiredTigerKVEngineRepairTest, OrphanedDataFilesCanBeRecovered) {
auto opCtxPtr = _makeOperationContext(); auto opCtxPtr = _makeOperationContext();
auto& ru = *shard_role_details::getRecoveryUnit(opCtxPtr.get()); auto& ru = *shard_role_details::getRecoveryUnit(opCtxPtr.get());
NamespaceString nss = NamespaceString::createNamespaceString_forTest("a.b");
std::string ident = "collection-1234"; std::string ident = "collection-1234";
std::string record = "abcd"; RecordStore::Options options;
CollectionOptions defaultCollectionOptions; NamespaceString nss = NamespaceString::createNamespaceString_forTest("a.b");
ASSERT_OK(_helper.getWiredTigerKVEngine()->createRecordStore(nss, ident, options));
std::unique_ptr<RecordStore> rs; auto rs = _helper.getWiredTigerKVEngine()->getRecordStore(
ASSERT_OK(_helper.getWiredTigerKVEngine()->createRecordStore(nss, ident)); opCtxPtr.get(), nss, ident, options, UUID::gen());
rs = _helper.getWiredTigerKVEngine()->getRecordStore(
opCtxPtr.get(), nss, ident, defaultCollectionOptions);
ASSERT(rs); ASSERT(rs);
RecordId loc; RecordId loc;
std::string record = "abcd";
{ {
StorageWriteTransaction txn(ru); StorageWriteTransaction txn(ru);
StatusWith<RecordId> res = StatusWith<RecordId> res =
@ -199,7 +197,7 @@ TEST_F(WiredTigerKVEngineRepairTest, OrphanedDataFilesCanBeRecovered) {
ASSERT(!boost::filesystem::exists(tmpFile)); ASSERT(!boost::filesystem::exists(tmpFile));
#ifdef _WIN32 #ifdef _WIN32
auto status = _helper.getWiredTigerKVEngine()->recoverOrphanedIdent(nss, ident); auto status = _helper.getWiredTigerKVEngine()->recoverOrphanedIdent(nss, ident, options);
ASSERT_EQ(ErrorCodes::CommandNotSupported, status.code()); ASSERT_EQ(ErrorCodes::CommandNotSupported, status.code());
#else #else
@ -221,7 +219,7 @@ TEST_F(WiredTigerKVEngineRepairTest, OrphanedDataFilesCanBeRecovered) {
boost::filesystem::rename(tmpFile, *dataFilePath, err); boost::filesystem::rename(tmpFile, *dataFilePath, err);
ASSERT(!err) << err.message(); ASSERT(!err) << err.message();
auto status = _helper.getWiredTigerKVEngine()->recoverOrphanedIdent(nss, ident); auto status = _helper.getWiredTigerKVEngine()->recoverOrphanedIdent(nss, ident, options);
ASSERT_EQ(ErrorCodes::DataModifiedByRepair, status.code()); ASSERT_EQ(ErrorCodes::DataModifiedByRepair, status.code());
#endif #endif
} }
@ -232,16 +230,16 @@ TEST_F(WiredTigerKVEngineRepairTest, UnrecoverableOrphanedDataFilesAreRebuilt) {
NamespaceString nss = NamespaceString::createNamespaceString_forTest("a.b"); NamespaceString nss = NamespaceString::createNamespaceString_forTest("a.b");
std::string ident = "collection-1234"; std::string ident = "collection-1234";
std::string record = "abcd"; RecordStore::Options options;
CollectionOptions defaultCollectionOptions; ASSERT_OK(_helper.getWiredTigerKVEngine()->createRecordStore(nss, ident, options));
std::unique_ptr<RecordStore> rs; UUID uuid = UUID::gen();
ASSERT_OK(_helper.getWiredTigerKVEngine()->createRecordStore(nss, ident)); auto rs =
rs = _helper.getWiredTigerKVEngine()->getRecordStore( _helper.getWiredTigerKVEngine()->getRecordStore(opCtxPtr.get(), nss, ident, options, uuid);
opCtxPtr.get(), nss, ident, defaultCollectionOptions);
ASSERT(rs); ASSERT(rs);
RecordId loc; RecordId loc;
std::string record = "abcd";
{ {
StorageWriteTransaction txn(ru); StorageWriteTransaction txn(ru);
StatusWith<RecordId> res = StatusWith<RecordId> res =
@ -264,7 +262,7 @@ TEST_F(WiredTigerKVEngineRepairTest, UnrecoverableOrphanedDataFilesAreRebuilt) {
shard_role_details::getRecoveryUnit(opCtxPtr.get()), ident, /*identHasSizeInfo=*/true)); shard_role_details::getRecoveryUnit(opCtxPtr.get()), ident, /*identHasSizeInfo=*/true));
#ifdef _WIN32 #ifdef _WIN32
auto status = _helper.getWiredTigerKVEngine()->recoverOrphanedIdent(nss, ident); auto status = _helper.getWiredTigerKVEngine()->recoverOrphanedIdent(nss, ident, options);
ASSERT_EQ(ErrorCodes::CommandNotSupported, status.code()); ASSERT_EQ(ErrorCodes::CommandNotSupported, status.code());
#else #else
// The ident may not get immediately dropped, so ensure it is completely gone. // The ident may not get immediately dropped, so ensure it is completely gone.
@ -282,14 +280,13 @@ TEST_F(WiredTigerKVEngineRepairTest, UnrecoverableOrphanedDataFilesAreRebuilt) {
// This should recreate an empty data file successfully and move the old one to a name that ends // This should recreate an empty data file successfully and move the old one to a name that ends
// in ".corrupt". // in ".corrupt".
auto status = _helper.getWiredTigerKVEngine()->recoverOrphanedIdent(nss, ident); auto status = _helper.getWiredTigerKVEngine()->recoverOrphanedIdent(nss, ident, options);
ASSERT_EQ(ErrorCodes::DataModifiedByRepair, status.code()) << status.reason(); ASSERT_EQ(ErrorCodes::DataModifiedByRepair, status.code()) << status.reason();
boost::filesystem::path corruptFile = (dataFilePath->string() + ".corrupt"); boost::filesystem::path corruptFile = (dataFilePath->string() + ".corrupt");
ASSERT(boost::filesystem::exists(corruptFile)); ASSERT(boost::filesystem::exists(corruptFile));
rs = _helper.getWiredTigerKVEngine()->getRecordStore( rs = _helper.getWiredTigerKVEngine()->getRecordStore(opCtxPtr.get(), nss, ident, options, uuid);
opCtxPtr.get(), nss, ident, defaultCollectionOptions);
RecordData data; RecordData data;
ASSERT_FALSE(rs->findRecord(opCtxPtr.get(), loc, &data)); ASSERT_FALSE(rs->findRecord(opCtxPtr.get(), loc, &data));
#endif #endif
@ -397,9 +394,8 @@ TEST_F(WiredTigerKVEngineTest, CreateRecordStoreFailsWithExistingIdent) {
NamespaceString nss = NamespaceString::createNamespaceString_forTest("a.b"); NamespaceString nss = NamespaceString::createNamespaceString_forTest("a.b");
std::string ident = "collection-1234"; std::string ident = "collection-1234";
RecordStore::Options options;
std::unique_ptr<RecordStore> rs; ASSERT_OK(_helper.getWiredTigerKVEngine()->createRecordStore(nss, ident, options));
ASSERT_OK(_helper.getWiredTigerKVEngine()->createRecordStore(nss, ident));
// A new record store must always have its own storage table uniquely identified by the ident. // A new record store must always have its own storage table uniquely identified by the ident.
// Otherwise, multiple record stores could point to the same storage resource and lead to data // Otherwise, multiple record stores could point to the same storage resource and lead to data
@ -408,7 +404,7 @@ TEST_F(WiredTigerKVEngineTest, CreateRecordStoreFailsWithExistingIdent) {
// Validate the server throws when trying to create a new record store with an ident already in // Validate the server throws when trying to create a new record store with an ident already in
// use. // use.
const auto status = _helper.getWiredTigerKVEngine()->createRecordStore(nss, ident); const auto status = _helper.getWiredTigerKVEngine()->createRecordStore(nss, ident, options);
ASSERT_NOT_OK(status); ASSERT_NOT_OK(status);
ASSERT_EQ(status.code(), ErrorCodes::ObjectAlreadyExists); ASSERT_EQ(status.code(), ErrorCodes::ObjectAlreadyExists);
} }
@ -423,10 +419,9 @@ TEST_F(WiredTigerKVEngineTest, IdentDrop) {
NamespaceString nss = NamespaceString::createNamespaceString_forTest("a.b"); NamespaceString nss = NamespaceString::createNamespaceString_forTest("a.b");
std::string ident = "collection-1234"; std::string ident = "collection-1234";
CollectionOptions defaultCollectionOptions; RecordStore::Options options;
std::unique_ptr<RecordStore> rs; ASSERT_OK(_helper.getWiredTigerKVEngine()->createRecordStore(nss, ident, options));
ASSERT_OK(_helper.getWiredTigerKVEngine()->createRecordStore(nss, ident));
const boost::optional<boost::filesystem::path> dataFilePath = const boost::optional<boost::filesystem::path> dataFilePath =
_helper.getWiredTigerKVEngine()->getDataFilePathForIdent(ident); _helper.getWiredTigerKVEngine()->getDataFilePathForIdent(ident);
@ -439,7 +434,7 @@ TEST_F(WiredTigerKVEngineTest, IdentDrop) {
// Because the underlying file was not removed, it will be renamed out of the way by WiredTiger // Because the underlying file was not removed, it will be renamed out of the way by WiredTiger
// when creating a new table with the same ident. // when creating a new table with the same ident.
ASSERT_OK(_helper.getWiredTigerKVEngine()->createRecordStore(nss, ident)); ASSERT_OK(_helper.getWiredTigerKVEngine()->createRecordStore(nss, ident, options));
const boost::filesystem::path renamedFilePath = dataFilePath->generic_string() + ".1"; const boost::filesystem::path renamedFilePath = dataFilePath->generic_string() + ".1";
ASSERT(boost::filesystem::exists(*dataFilePath)); ASSERT(boost::filesystem::exists(*dataFilePath));
@ -915,8 +910,8 @@ protected:
// Creates the given ident, returning the path to it. // Creates the given ident, returning the path to it.
StatusWith<boost::filesystem::path> createIdent(StringData ns, StringData ident) { StatusWith<boost::filesystem::path> createIdent(StringData ns, StringData ident) {
NamespaceString nss = NamespaceString::createNamespaceString_forTest(ns); NamespaceString nss = NamespaceString::createNamespaceString_forTest(ns);
CollectionOptions defaultCollectionOptions; RecordStore::Options options;
Status stat = _helper.getWiredTigerKVEngine()->createRecordStore(nss, ident); Status stat = _helper.getWiredTigerKVEngine()->createRecordStore(nss, ident, options);
if (!stat.isOK()) { if (!stat.isOK()) {
return stat; return stat;
} }

View File

@ -47,7 +47,6 @@
#include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/timestamp.h" #include "mongo/bson/timestamp.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/collection_crud/capped_visibility.h" #include "mongo/db/collection_crud/capped_visibility.h"
#include "mongo/db/operation_context.h" #include "mongo/db/operation_context.h"
#include "mongo/db/record_id.h" #include "mongo/db/record_id.h"

View File

@ -89,15 +89,13 @@ WiredTigerHarnessHelper::WiredTigerHarnessHelper(Options options, StringData ext
} }
std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newRecordStore( std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newRecordStore(
const std::string& ns, const CollectionOptions& collOptions) { const NamespaceString& nss,
StringData ident,
const RecordStore::Options& recordStoreOptions,
boost::optional<UUID> uuid) {
ServiceContext::UniqueOperationContext opCtx(newOperationContext()); ServiceContext::UniqueOperationContext opCtx(newOperationContext());
StringData ident = ns; const auto res = _engine->createRecordStore(nss, ident, recordStoreOptions);
NamespaceString nss = NamespaceString::createNamespaceString_forTest(ns); return _engine->getRecordStore(opCtx.get(), nss, ident, recordStoreOptions, uuid);
const auto keyFormat = collOptions.clusteredIndex ? KeyFormat::String : KeyFormat::Long;
const auto res = _engine->createRecordStore(
nss, ident, keyFormat, collOptions.timeseries.has_value(), collOptions.storageEngine);
return _engine->getRecordStore(opCtx.get(), nss, ident, collOptions);
} }
std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStore() { std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStore() {
@ -111,33 +109,17 @@ std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStore() {
} }
std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStoreNoInit() { std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStoreNoInit() {
ServiceContext::UniqueOperationContext opCtx(newOperationContext());
WiredTigerRecoveryUnit* ru =
checked_cast<WiredTigerRecoveryUnit*>(shard_role_details::getRecoveryUnit(opCtx.get()));
std::string ident = redactTenant(NamespaceString::kRsOplogNamespace).toString(); std::string ident = redactTenant(NamespaceString::kRsOplogNamespace).toString();
std::string uri = WiredTigerUtil::kTableUriPrefix + ident; RecordStore::Options oplogRecordStoreOptions;
oplogRecordStoreOptions.isOplog = true;
CollectionOptions options; oplogRecordStoreOptions.isCapped = true;
options.capped = true; // Large enough not to exceed capped limits.
oplogRecordStoreOptions.oplogMaxSize = 1024 * 1024 * 1024;
const NamespaceString oplogNss = NamespaceString::kRsOplogNamespace; const auto res = _engine->createRecordStore(
WiredTigerRecordStoreBase::WiredTigerTableConfig wtTableConfig = NamespaceString::kRsOplogNamespace, ident, oplogRecordStoreOptions);
getWiredTigerTableConfigFromStartupOptions();
wtTableConfig.keyFormat = KeyFormat::Long;
wtTableConfig.logEnabled =
WiredTigerUtil::useTableLogging(oplogNss, _isReplSet, _shouldRecoverFromOplogAsStandalone);
StatusWith<std::string> result = WiredTigerRecordStoreBase::generateCreateString(
NamespaceStringUtil::serializeForCatalog(oplogNss), wtTableConfig, true);
ASSERT_TRUE(result.isOK());
std::string config = result.getValue();
{
StorageWriteTransaction txn(*ru);
WiredTigerSession* s = ru->getSession();
invariantWTOK(s->create(uri.c_str(), config.c_str()), *s);
txn.commit();
}
// Cannot use 'getRecordStore', which automatically starts the the oplog manager.
ServiceContext::UniqueOperationContext opCtx(newOperationContext());
return std::make_unique<WiredTigerRecordStore::Oplog>( return std::make_unique<WiredTigerRecordStore::Oplog>(
_engine.get(), _engine.get(),
WiredTigerRecoveryUnit::get(*shard_role_details::getRecoveryUnit(opCtx.get())), WiredTigerRecoveryUnit::get(*shard_role_details::getRecoveryUnit(opCtx.get())),
@ -146,7 +128,7 @@ std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStoreNoInit(
.engineName = std::string{kWiredTigerEngineName}, .engineName = std::string{kWiredTigerEngineName},
.inMemory = false, .inMemory = false,
// Large enough not to exceed capped limits. // Large enough not to exceed capped limits.
.oplogMaxSize = 1024 * 1024 * 1024, .oplogMaxSize = oplogRecordStoreOptions.oplogMaxSize,
.sizeStorer = nullptr, .sizeStorer = nullptr,
.tracksSizeAdjustments = true, .tracksSizeAdjustments = true,
.forceUpdateWithFullDocument = false}); .forceUpdateWithFullDocument = false});

View File

@ -32,8 +32,6 @@
#include <wiredtiger.h> #include <wiredtiger.h>
#include "mongo/base/string_data.h" #include "mongo/base/string_data.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/storage/key_format.h"
#include "mongo/db/storage/kv/kv_engine.h" #include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/db/storage/record_store.h" #include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/record_store_test_harness.h" #include "mongo/db/storage/record_store_test_harness.h"
@ -60,11 +58,20 @@ public:
} }
virtual std::unique_ptr<RecordStore> newRecordStore(const std::string& ns) { virtual std::unique_ptr<RecordStore> newRecordStore(const std::string& ns) {
return newRecordStore(ns, CollectionOptions()); return newRecordStore(ns, RecordStore::Options{});
} }
std::unique_ptr<RecordStore> newRecordStore(const std::string& ns, std::unique_ptr<RecordStore> newRecordStore(
const CollectionOptions& collOptions) override; const std::string& ns, const RecordStore::Options& recordStoreOptions) override {
auto ident = ns;
NamespaceString nss = NamespaceString::createNamespaceString_forTest(ns);
return newRecordStore(nss, ident, recordStoreOptions, UUID::gen());
}
std::unique_ptr<RecordStore> newRecordStore(const NamespaceString& nss,
StringData ident,
const RecordStore::Options& recordStoreOptions,
boost::optional<UUID> uuid);
std::unique_ptr<RecordStore> newOplogRecordStore() override; std::unique_ptr<RecordStore> newOplogRecordStore() override;

View File

@ -109,8 +109,8 @@ public:
const std::string& ns) final { const std::string& ns) final {
std::string ident = ns; std::string ident = ns;
NamespaceString nss = NamespaceString::createNamespaceString_forTest(ns); NamespaceString nss = NamespaceString::createNamespaceString_forTest(ns);
const auto res = _engine->createRecordStore(nss, ident); const auto res = _engine->createRecordStore(nss, ident, RecordStore::Options{});
return _engine->getRecordStore(opCtx, nss, ident, CollectionOptions()); return _engine->getRecordStore(opCtx, nss, ident, RecordStore::Options{}, UUID::gen());
} }
WiredTigerKVEngine* getEngine() { WiredTigerKVEngine* getEngine() {