SERVER-106716 Remove `featureFlagCreateSpillKVEngine` (#42379)

GitOrigin-RevId: 9907d148adde66a6c27872b4af3afff03d1fac83
This commit is contained in:
Gregory Noma 2025-10-14 09:38:09 -04:00 committed by MongoDB Bot
parent afb5cd3323
commit c500e53e89
22 changed files with 87 additions and 422 deletions

View File

@ -3,7 +3,6 @@
// Start our own instance of mongod so that are settings tests
// do not cause issues for other tests
//
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
let ss = db.serverStatus();
@ -26,9 +25,14 @@ if (ss.storageEngine.name !== "wiredTiger") {
// http://source.wiredtiger.com/develop/struct_w_t___c_o_n_n_e_c_t_i_o_n.html#a579141678af06217b22869cbc604c6d4
assert.commandWorked(reconfigure("eviction_target=81"));
assert.eq("eviction_target=81", admin.adminCommand({getParameter: 1, [paramName]: 1})[paramName]);
assert.commandWorked(reconfigure("eviction_dirty_target=19"));
assert.eq("eviction_dirty_target=19", admin.adminCommand({getParameter: 1, [paramName]: 1})[paramName]);
assert.commandWorked(reconfigure("eviction_dirty_trigger=20"));
assert.eq("eviction_dirty_trigger=20", admin.adminCommand({getParameter: 1, [paramName]: 1})[paramName]);
assert.commandWorked(reconfigure("eviction_updates_trigger=0"));
assert.eq("eviction_updates_trigger=0", admin.adminCommand({getParameter: 1, [paramName]: 1})[paramName]);
assert.commandWorked(reconfigure("cache_size=81M"));
assert.eq("cache_size=81M", admin.adminCommand({getParameter: 1, [paramName]: 1})[paramName]);
assert.commandWorked(reconfigure("eviction_dirty_target=19")); // must be lower than eviction_dirty_trigger (default 20)
assert.commandWorked(reconfigure("shared_cache=(chunk=11MB, name=bar, reserve=12MB, size=1G)"));
// Negative tests - bad input to mongod
@ -44,10 +48,7 @@ if (ss.storageEngine.name !== "wiredTiger") {
}
runTestForParam("wiredTigerEngineRuntimeConfig");
// TODO (SERVER-106716): Remove the feature flag check.
if (FeatureFlagUtil.isPresentAndEnabled(db, "FeatureFlagCreateSpillKVEngine")) {
runTestForParam("spillWiredTigerEngineRuntimeConfig");
}
MongoRunner.stopMongod(conn);
}

View File

@ -159,14 +159,6 @@ void HashLookupStage::doDetachFromOperationContext() {
_hashTable.doDetachFromOperationContext();
}
void HashLookupStage::doSaveState() {
_hashTable.doSaveState();
}
void HashLookupStage::doRestoreState() {
_hashTable.doRestoreState();
}
void HashLookupStage::reset(bool fromClose) {
// Also resets the memory threshold if the knob changes between re-open calls.
_hashTable.reset(fromClose);

View File

@ -125,9 +125,6 @@ protected:
void doAttachToOperationContext(OperationContext* opCtx) override;
void doDetachFromOperationContext() override;
void doSaveState() override;
void doRestoreState() override;
void doAttachCollectionAcquisition(const MultipleCollectionAccessor& mca) override {
return;
}

View File

@ -136,14 +136,6 @@ void HashLookupUnwindStage::doDetachFromOperationContext() {
_hashTable.doDetachFromOperationContext();
}
void HashLookupUnwindStage::doSaveState() {
_hashTable.doSaveState();
}
void HashLookupUnwindStage::doRestoreState() {
_hashTable.doRestoreState();
}
void HashLookupUnwindStage::reset(bool fromClose) {
_outerKeyOpen = false;
// Also resets the memory threshold if the knob changes between re-open calls.

View File

@ -124,9 +124,6 @@ protected:
void doAttachToOperationContext(OperationContext* opCtx) override;
void doDetachFromOperationContext() override;
void doSaveState() override;
void doRestoreState() override;
void doAttachCollectionAcquisition(const MultipleCollectionAccessor& mca) override {
return;
}

View File

@ -70,19 +70,11 @@ void HashAggBaseStage<Derived>::doSaveState() {
if (_rsCursor) {
_recordStore->saveCursor(_opCtx, _rsCursor);
}
if (_recordStore) {
_recordStore->saveState();
}
}
template <class Derived>
void HashAggBaseStage<Derived>::doRestoreState() {
invariant(_opCtx);
if (_recordStore) {
_recordStore->restoreState();
}
if (_recordStore && _rsCursor) {
auto couldRestore = _recordStore->restoreCursor(_opCtx, _rsCursor);
uassert(6196500, "HashAggStage could not restore cursor", couldRestore);

View File

@ -453,24 +453,6 @@ void LookupHashTable::reset(bool fromClose) {
_totalSpilledBytes = 0;
}
void LookupHashTable::doSaveState() {
if (_recordStoreHt) {
_recordStoreHt->saveState();
}
if (_recordStoreBuf) {
_recordStoreBuf->saveState();
}
}
void LookupHashTable::doRestoreState() {
if (_recordStoreHt) {
_recordStoreHt->restoreState();
}
if (_recordStoreBuf) {
_recordStoreBuf->restoreState();
}
}
void LookupHashTable::forceSpill() {
if (!_memoryHt) {
LOGV2_DEBUG(9916001, 2, "HashLookupStage has finished its execution");

View File

@ -189,9 +189,6 @@ public:
*/
size_t bufferValueOrSpill(value::MaterializedRow& value);
void doSaveState();
void doRestoreState();
void forceSpill();
const HashLookupStats* getHashLookupStats() const {

View File

@ -116,17 +116,6 @@ std::unique_ptr<PlanStage> WindowStage::clone() const {
participateInTrialRunTracking());
}
void WindowStage::doSaveState() {
if (_recordStore) {
_recordStore->saveState();
}
}
void WindowStage::doRestoreState() {
if (_recordStore) {
_recordStore->restoreState();
}
}
size_t WindowStage::getLastRowId() {
return _lastRowId;
}
@ -163,12 +152,6 @@ void WindowStage::spill() {
" pass allowDiskUse:true to opt in",
_allowDiskUse);
// Ensure there is sufficient disk space for spilling
if (!feature_flags::gFeatureFlagCreateSpillKVEngine.isEnabled()) {
uassertStatusOK(ensureSufficientDiskSpaceForSpilling(
storageGlobalParams.dbpath, internalQuerySpillingMinAvailableDiskSpaceBytes.load()));
}
// Create spilled record storage if not created.
if (!_recordStore) {
_recordStore = std::make_unique<SpillingStore>(_opCtx, KeyFormat::Long);

View File

@ -108,9 +108,6 @@ public:
protected:
void doSaveState() override;
void doRestoreState() override;
void doAttachCollectionAcquisition(const MultipleCollectionAccessor& mca) override {
return;
}

View File

@ -52,22 +52,6 @@ namespace mongo {
namespace sbe {
namespace {
/* We don't need to retry write conflicts in this class but when WiredTiger is low on cache
* we do need to retry the StorageUnavailable exceptions.
*/
template <typename F>
static void storageUnavailableRetry(OperationContext* opCtx,
StringData opStr,
F&& f,
boost::optional<size_t> retryLimit = boost::none) {
if (feature_flags::gFeatureFlagCreateSpillKVEngine.isEnabled()) {
// Storage unavailable exceptions are handled internally by the spill table.
return f();
}
// writeConflictRetry already implements a retryBackoff for storage unavailable.
writeConflictRetry(opCtx, opStr, NamespaceString::kEmpty, f, retryLimit);
}
[[nodiscard]] Lock::GlobalLock acquireLock(OperationContext* opCtx) {
return Lock::GlobalLock(
@ -105,19 +89,8 @@ key_string::Value decodeKeyString(const RecordId& rid, key_string::TypeBits type
SpillingStore::SpillingStore(OperationContext* opCtx, KeyFormat format) {
auto lk = acquireLock(opCtx);
if (feature_flags::gFeatureFlagCreateSpillKVEngine.isEnabled()) {
_spillTable = opCtx->getServiceContext()->getStorageEngine()->makeSpillTable(
opCtx, format, internalQuerySpillingMinAvailableDiskSpaceBytes.load());
return;
}
auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
_spillTable = storageEngine->makeTemporaryRecordStore(
opCtx, storageEngine->generateNewInternalIdent(), format);
_spillingUnit = opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit();
_spillingUnit->setCacheMaxWaitTimeout(Milliseconds(internalQuerySpillingMaxWaitTimeout.load()));
_spillingState = WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork;
}
SpillingStore::~SpillingStore() {}
@ -167,26 +140,14 @@ int SpillingStore::upsertToRecordStore(OperationContext* opCtx,
bool update) {
assertIgnorePrepareConflictsBehavior(opCtx);
switchToSpilling(opCtx);
ON_BLOCK_EXIT([&] { switchToOriginal(opCtx); });
auto result = mongo::Status::OK();
storageUnavailableRetry(opCtx, "SpillingStore::upsertToRecordStore", [&] {
auto result = [&] {
auto lk = acquireLock(opCtx);
boost::optional<WriteUnitOfWork> wuow;
if (_originalUnit) {
wuow.emplace(opCtx);
}
if (update) {
result = _spillTable->updateRecord(opCtx, key, buf.buf(), buf.len());
} else {
return _spillTable->updateRecord(opCtx, key, buf.buf(), buf.len());
}
std::vector<Record> records = {{key, {buf.buf(), buf.len()}}};
result = _spillTable->insertRecords(opCtx, &records);
}
if (wuow) {
wuow->commit();
}
});
return _spillTable->insertRecords(opCtx, &records);
}();
uassertStatusOK(result);
return buf.len();
@ -195,37 +156,17 @@ int SpillingStore::upsertToRecordStore(OperationContext* opCtx,
Status SpillingStore::insertRecords(OperationContext* opCtx, std::vector<Record>* inOutRecords) {
assertIgnorePrepareConflictsBehavior(opCtx);
switchToSpilling(opCtx);
ON_BLOCK_EXIT([&] { switchToOriginal(opCtx); });
auto status = Status::OK();
storageUnavailableRetry(opCtx, "SpillingStore::insertRecords", [&] {
auto lk = acquireLock(opCtx);
boost::optional<WriteUnitOfWork> wuow;
if (_originalUnit) {
wuow.emplace(opCtx);
}
status = _spillTable->insertRecords(opCtx, inOutRecords);
if (wuow) {
wuow->commit();
}
});
return status;
return _spillTable->insertRecords(opCtx, inOutRecords);
}
boost::optional<value::MaterializedRow> SpillingStore::readFromRecordStore(OperationContext* opCtx,
const RecordId& rid) {
switchToSpilling(opCtx);
ON_BLOCK_EXIT([&] { switchToOriginal(opCtx); });
RecordData record;
bool found = false;
// Because we impose a timeout for storage engine operations, we need to handle errors and retry
// reads too.
storageUnavailableRetry(opCtx, "SpillingStore::readFromRecordStore", [&] {
auto found = [&] {
auto lk = acquireLock(opCtx);
found = _spillTable->findRecord(opCtx, rid, &record);
});
return _spillTable->findRecord(opCtx, rid, &record);
}();
if (found) {
auto valueReader = BufReader(record.data(), record.size());
@ -235,57 +176,13 @@ boost::optional<value::MaterializedRow> SpillingStore::readFromRecordStore(Opera
}
bool SpillingStore::findRecord(OperationContext* opCtx, const RecordId& loc, RecordData* out) {
switchToSpilling(opCtx);
ON_BLOCK_EXIT([&] { switchToOriginal(opCtx); });
bool found = false;
// Because we impose a timeout for storage engine operations, we need to handle errors and retry
// reads too.
storageUnavailableRetry(opCtx, "SpillingStore::findRecord", [&] {
auto lk = acquireLock(opCtx);
found = _spillTable->findRecord(opCtx, loc, out);
});
return found;
}
void SpillingStore::switchToSpilling(OperationContext* opCtx) {
if (!_spillingUnit) {
return;
}
invariant(!_originalUnit);
ClientLock lk(opCtx->getClient());
_originalUnit = shard_role_details::releaseRecoveryUnit(opCtx, lk);
_originalState =
shard_role_details::setRecoveryUnit(opCtx, std::move(_spillingUnit), _spillingState, lk);
}
void SpillingStore::switchToOriginal(OperationContext* opCtx) {
if (!_originalUnit) {
return;
}
invariant(!_spillingUnit);
ClientLock lk(opCtx->getClient());
_spillingUnit = shard_role_details::releaseRecoveryUnit(opCtx, lk);
_spillingState =
shard_role_details::setRecoveryUnit(opCtx, std::move(_originalUnit), _originalState, lk);
invariant(!(_spillingUnit->getState() == RecoveryUnit::State::kInactiveInUnitOfWork ||
_spillingUnit->getState() == RecoveryUnit::State::kActive));
return _spillTable->findRecord(opCtx, loc, out);
}
int64_t SpillingStore::storageSize(OperationContext* opCtx) {
switchToSpilling(opCtx);
ON_BLOCK_EXIT([&] { switchToOriginal(opCtx); });
auto lk = acquireLock(opCtx);
return _spillTable->storageSize(*shard_role_details::getRecoveryUnit(opCtx));
}
void SpillingStore::saveState() {
if (_spillingUnit) {
_spillingUnit->abandonSnapshot();
}
}
void SpillingStore::restoreState() {
// We do not have to do anything.
return _spillTable->storageSize();
}
void SpillingStore::updateSpillStorageStatsForOperation(OperationContext* opCtx) {

View File

@ -111,53 +111,30 @@ public:
bool findRecord(OperationContext* opCtx, const RecordId& loc, RecordData* out);
auto getCursor(OperationContext* opCtx) {
switchToSpilling(opCtx);
ON_BLOCK_EXIT([&] { switchToOriginal(opCtx); });
return _spillTable->getCursor(opCtx);
}
void resetCursor(OperationContext* opCtx, std::unique_ptr<SpillTable::Cursor>& cursor) {
switchToSpilling(opCtx);
ON_BLOCK_EXIT([&] { switchToOriginal(opCtx); });
cursor.reset();
}
auto saveCursor(OperationContext* opCtx, std::unique_ptr<SpillTable::Cursor>& cursor) {
switchToSpilling(opCtx);
ON_BLOCK_EXIT([&] { switchToOriginal(opCtx); });
cursor->save();
cursor->detachFromOperationContext();
}
auto restoreCursor(OperationContext* opCtx, std::unique_ptr<SpillTable::Cursor>& cursor) {
switchToSpilling(opCtx);
ON_BLOCK_EXIT([&] { switchToOriginal(opCtx); });
cursor->reattachToOperationContext(opCtx);
return cursor->restore(*shard_role_details::getRecoveryUnit(opCtx));
return cursor->restore();
}
int64_t storageSize(OperationContext* opCtx);
void saveState();
void restoreState();
void updateSpillStorageStatsForOperation(OperationContext* opCtx);
private:
void switchToSpilling(OperationContext* opCtx);
void switchToOriginal(OperationContext* opCtx);
std::unique_ptr<SpillTable> _spillTable;
std::unique_ptr<RecoveryUnit> _originalUnit;
WriteUnitOfWork::RecoveryUnitState _originalState =
WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork;
std::unique_ptr<RecoveryUnit> _spillingUnit;
WriteUnitOfWork::RecoveryUnitState _spillingState;
size_t _counter{0};
};
} // namespace sbe

View File

@ -1195,20 +1195,6 @@ boost::optional<TimeseriesOptions> CommonMongodProcessInterface::_getTimeseriesO
return mongo::timeseries::getTimeseriesOptions(opCtx, ns, true /*convertToBucketsNamespace*/);
}
namespace {
// TODO SERVER-106716 Remove this
template <typename SpillTableWriteOperation>
void withWriteUnitOfWorkIfNeeded(OperationContext* opCtx, SpillTableWriteOperation operation) {
if (feature_flags::gFeatureFlagCreateSpillKVEngine.isEnabled()) {
operation();
} else {
WriteUnitOfWork wuow(opCtx);
operation();
wuow.commit();
}
}
} // namespace
void CommonMongodProcessInterface::writeRecordsToSpillTable(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
SpillTable& spillTable,
@ -1221,12 +1207,10 @@ void CommonMongodProcessInterface::writeRecordsToSpillTable(
expCtx->getNamespaceString(),
[&] {
Lock::GlobalLock lk = acquireLockForSpillTable(expCtx->getOperationContext());
withWriteUnitOfWorkIfNeeded(expCtx->getOperationContext(), [&]() {
auto writeResult = spillTable.insertRecords(expCtx->getOperationContext(), records);
uassert(ErrorCodes::OutOfDiskSpace,
str::stream() << "Failed to write to disk because " << writeResult.reason(),
writeResult.isOK());
});
}
);
@ -1236,18 +1220,11 @@ std::unique_ptr<SpillTable> CommonMongodProcessInterface::createSpillTable(
const boost::intrusive_ptr<ExpressionContext>& expCtx, KeyFormat keyFormat) const {
assertIgnorePrepareConflictsBehavior(expCtx);
Lock::GlobalLock lk = acquireLockForSpillTable(expCtx->getOperationContext());
if (feature_flags::gFeatureFlagCreateSpillKVEngine.isEnabled()) {
return expCtx->getOperationContext()
->getServiceContext()
->getStorageEngine()
->makeSpillTable(expCtx->getOperationContext(),
return expCtx->getOperationContext()->getServiceContext()->getStorageEngine()->makeSpillTable(
expCtx->getOperationContext(),
keyFormat,
internalQuerySpillingMinAvailableDiskSpaceBytes.load());
}
auto storageEngine = expCtx->getOperationContext()->getServiceContext()->getStorageEngine();
return storageEngine->makeTemporaryRecordStore(
expCtx->getOperationContext(), storageEngine->generateNewInternalIdent(), keyFormat);
}
Document CommonMongodProcessInterface::readRecordFromSpillTable(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@ -1281,10 +1258,8 @@ void CommonMongodProcessInterface::deleteRecordFromSpillTable(
[&] {
Lock::GlobalLock lk =
acquireLockForSpillTable(expCtx->getOperationContext());
withWriteUnitOfWorkIfNeeded(expCtx->getOperationContext(), [&]() {
spillTable.deleteRecord(expCtx->getOperationContext(), rID);
});
});
}
void CommonMongodProcessInterface::truncateSpillTable(
@ -1296,11 +1271,9 @@ void CommonMongodProcessInterface::truncateSpillTable(
[&] {
Lock::GlobalLock lk =
acquireLockForSpillTable(expCtx->getOperationContext());
withWriteUnitOfWorkIfNeeded(expCtx->getOperationContext(), [&]() {
auto status = spillTable.truncate(expCtx->getOperationContext());
tassert(5643000, "Unable to clear record store", status.isOK());
});
});
}
boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocumentLocally(

View File

@ -143,19 +143,16 @@ void SpillableDeque::spillToDisk() {
// Write final batch.
writer.flush();
_stats.updateSpillingStats(
1,
_stats.updateSpillingStats(1,
writer.writtenBytes(),
writer.writtenRecords(),
static_cast<uint64_t>(_diskCache->storageSize(
*shard_role_details::getRecoveryUnit(_expCtx->getOperationContext()))));
static_cast<uint64_t>(_diskCache->storageSize()));
CurOp::get(_expCtx->getOperationContext())
->updateSpillStorageStats(_diskCache->computeOperationStatisticsSinceLastCall());
}
void SpillableDeque::updateStorageSizeStat() {
_stats.updateSpilledDataStorageSize(_diskCache->storageSize(
*shard_role_details::getRecoveryUnit(_expCtx->getOperationContext())));
_stats.updateSpilledDataStorageSize(_diskCache->storageSize());
}
Document SpillableDeque::readDocumentFromDiskById(int desired) {

View File

@ -83,11 +83,6 @@ void SpillableDocumentMapImpl::spillToDisk() {
return;
}
if (!feature_flags::gFeatureFlagCreateSpillKVEngine.isEnabled()) {
uassertStatusOK(ensureSufficientDiskSpaceForSpilling(
storageGlobalParams.dbpath, internalQuerySpillingMinAvailableDiskSpaceBytes.load()));
}
if (_diskMap == nullptr) {
initDiskMap();
}
@ -101,12 +96,10 @@ void SpillableDocumentMapImpl::spillToDisk() {
writer.flush();
_diskMapSize += writer.writtenRecords();
_stats.updateSpillingStats(
1,
_stats.updateSpillingStats(1,
writer.writtenBytes(),
writer.writtenRecords(),
static_cast<uint64_t>(_diskMap->storageSize(
*shard_role_details::getRecoveryUnit(_expCtx->getOperationContext()))));
static_cast<uint64_t>(_diskMap->storageSize()));
CurOp::get(_expCtx->getOperationContext())
->updateSpillStorageStats(_diskMap->computeOperationStatisticsSinceLastCall());
}
@ -129,8 +122,7 @@ RecordId SpillableDocumentMapImpl::computeKey(const Value& id) const {
}
void SpillableDocumentMapImpl::updateStorageSizeStat() {
_stats.updateSpilledDataStorageSize(_diskMap->storageSize(
*shard_role_details::getRecoveryUnit(_expCtx->getOperationContext())));
_stats.updateSpilledDataStorageSize(_diskMap->storageSize());
}
template <bool IsConst>
@ -263,8 +255,7 @@ auto SpillableDocumentMapImpl::IteratorImpl<IsConst>::getCurrentDocument() -> re
template <bool IsConst>
void SpillableDocumentMapImpl::IteratorImpl<IsConst>::restoreDiskIt() {
_diskIt->reattachToOperationContext(_map->_expCtx->getOperationContext());
bool restoreResult = _diskIt->restore(
*shard_role_details::getRecoveryUnit(_map->_expCtx->getOperationContext()));
bool restoreResult = _diskIt->restore();
tassert(2398004, "Unable to restore disk cursor", restoreResult);
}

View File

@ -412,10 +412,6 @@ TEST_F(StorageEngineTest, TemporaryRecordStoreClustered) {
class StorageEngineReconcileTest : public StorageEngineTest {
protected:
StorageEngineReconcileTest()
: StorageEngineTest(
StorageEngineTest::Options{}.setParameter("featureFlagCreateSpillKVEngine", true)) {}
UUID collectionUUID = UUID::gen();
UUID buildUUID = UUID::gen();
std::string resumableIndexFileName = "foo";

View File

@ -44,7 +44,7 @@
namespace mongo {
SpillTable::Cursor::Cursor(RecoveryUnit* ru, std::unique_ptr<SeekableRecordCursor> cursor)
SpillTable::Cursor::Cursor(RecoveryUnit& ru, std::unique_ptr<SeekableRecordCursor> cursor)
: _ru(ru), _cursor(std::move(cursor)) {}
boost::optional<Record> SpillTable::Cursor::seekExact(const RecordId& id) {
@ -57,15 +57,11 @@ boost::optional<Record> SpillTable::Cursor::next() {
void SpillTable::Cursor::detachFromOperationContext() {
_cursor->detachFromOperationContext();
if (_ru) {
_ru->setOperationContext(nullptr);
}
_ru.setOperationContext(nullptr);
}
void SpillTable::Cursor::reattachToOperationContext(OperationContext* opCtx) {
if (_ru) {
_ru->setOperationContext(opCtx);
}
_ru.setOperationContext(opCtx);
_cursor->reattachToOperationContext(opCtx);
}
@ -73,8 +69,8 @@ void SpillTable::Cursor::save() {
_cursor->save();
}
bool SpillTable::Cursor::restore(RecoveryUnit& ru) {
return _cursor->restore(_ru ? *_ru : ru);
bool SpillTable::Cursor::restore() {
return _cursor->restore(_ru);
}
SpillTable::DiskState::DiskState(DiskSpaceMonitor& monitor, int64_t thresholdBytes)
@ -100,9 +96,6 @@ bool SpillTable::DiskState::full() const {
return _full.load();
}
SpillTable::SpillTable(std::unique_ptr<RecoveryUnit> ru, std::unique_ptr<RecordStore> rs)
: _ru(std::move(ru)), _rs(std::move(rs)), _storageEngine(nullptr) {}
SpillTable::SpillTable(std::unique_ptr<RecoveryUnit> ru,
std::unique_ptr<RecordStore> rs,
StorageEngine& storageEngine,
@ -110,7 +103,7 @@ SpillTable::SpillTable(std::unique_ptr<RecoveryUnit> ru,
int64_t thresholdBytes)
: _ru(std::move(ru)),
_rs(std::move(rs)),
_storageEngine(&storageEngine),
_storageEngine(storageEngine),
_diskState(boost::in_place_init, diskMonitor, thresholdBytes) {
// Abandon the snapshot right away in case the recovery unit was given to us with a snapshot
// already open from creating the table.
@ -118,10 +111,6 @@ SpillTable::SpillTable(std::unique_ptr<RecoveryUnit> ru,
}
SpillTable::~SpillTable() {
if (!_storageEngine) {
return;
}
// As an optimization, truncate the table before dropping it so that the checkpoint taken at
// shutdown never has to do the work to write the data to disk.
try {
@ -136,7 +125,7 @@ SpillTable::~SpillTable() {
"error"_attr = exceptionToStatus());
}
_storageEngine->dropSpillTable(*_ru, ident());
_storageEngine.dropSpillTable(*_ru, ident());
}
StringData SpillTable::ident() const {
@ -151,12 +140,7 @@ long long SpillTable::numRecords() const {
return _rs->numRecords();
}
int64_t SpillTable::storageSize(RecoveryUnit& ru) const {
// TODO (SERVER-106716): Remove this case.
if (!_ru) {
return _rs->storageSize(ru);
}
int64_t SpillTable::storageSize() const {
_ru->setIsolation(RecoveryUnit::Isolation::readUncommitted);
return _rs->storageSize(*_ru);
}
@ -166,13 +150,6 @@ Status SpillTable::insertRecords(OperationContext* opCtx, std::vector<Record>* r
return status;
}
// TODO (SERVER-106716): Remove this case.
if (!_ru) {
std::vector<Timestamp> timestamps(records->size());
return _rs->insertRecords(
opCtx, *shard_role_details::getRecoveryUnit(opCtx), records, timestamps);
}
_ru->setOperationContext(opCtx);
_ru->setIsolation(RecoveryUnit::Isolation::snapshot);
ON_BLOCK_EXIT([this] {
@ -233,11 +210,6 @@ Status SpillTable::insertRecords(OperationContext* opCtx, std::vector<Record>* r
}
bool SpillTable::findRecord(OperationContext* opCtx, const RecordId& rid, RecordData* out) const {
// TODO (SERVER-106716): Remove this case.
if (!_ru) {
return _rs->findRecord(opCtx, *shard_role_details::getRecoveryUnit(opCtx), rid, out);
}
_ru->setOperationContext(opCtx);
_ru->setIsolation(RecoveryUnit::Isolation::readUncommitted);
ON_BLOCK_EXIT([this] { _ru->setOperationContext(nullptr); });
@ -253,12 +225,6 @@ Status SpillTable::updateRecord(OperationContext* opCtx,
return status;
}
// TODO (SERVER-106716): Remove this case.
if (!_ru) {
return _rs->updateRecord(
opCtx, *shard_role_details::getRecoveryUnit(opCtx), rid, data, len);
}
_ru->setOperationContext(opCtx);
_ru->setIsolation(RecoveryUnit::Isolation::snapshot);
ON_BLOCK_EXIT([this] {
@ -281,12 +247,6 @@ Status SpillTable::updateRecord(OperationContext* opCtx,
void SpillTable::deleteRecord(OperationContext* opCtx, const RecordId& rid) {
uassertStatusOK(_checkDiskSpace());
// TODO (SERVER-106716): Remove this case.
if (!_ru) {
_rs->deleteRecord(opCtx, *shard_role_details::getRecoveryUnit(opCtx), rid);
return;
}
_ru->setOperationContext(opCtx);
_ru->setIsolation(RecoveryUnit::Isolation::snapshot);
ON_BLOCK_EXIT([this] {
@ -303,16 +263,10 @@ void SpillTable::deleteRecord(OperationContext* opCtx, const RecordId& rid) {
std::unique_ptr<SpillTable::Cursor> SpillTable::getCursor(OperationContext* opCtx,
bool forward) const {
// TODO (SERVER-106716): Remove this case.
if (!_ru) {
return std::make_unique<SpillTable::Cursor>(
_ru.get(), _rs->getCursor(opCtx, *shard_role_details::getRecoveryUnit(opCtx), forward));
}
_ru->setOperationContext(opCtx);
_ru->setIsolation(RecoveryUnit::Isolation::readUncommitted);
return std::make_unique<SpillTable::Cursor>(_ru.get(), _rs->getCursor(opCtx, *_ru, forward));
return std::make_unique<SpillTable::Cursor>(*_ru, _rs->getCursor(opCtx, *_ru, forward));
}
Status SpillTable::truncate(OperationContext* opCtx) {
@ -320,11 +274,6 @@ Status SpillTable::truncate(OperationContext* opCtx) {
return status;
}
// TODO (SERVER-106716): Remove this case.
if (!_ru) {
return _rs->truncate(opCtx, *shard_role_details::getRecoveryUnit(opCtx));
}
_ru->setOperationContext(opCtx);
_ru->setIsolation(RecoveryUnit::Isolation::snapshot);
ON_BLOCK_EXIT([this] {
@ -352,16 +301,6 @@ Status SpillTable::rangeTruncate(OperationContext* opCtx,
return status;
}
// TODO (SERVER-106716): Remove this case.
if (!_ru) {
return _rs->rangeTruncate(opCtx,
*shard_role_details::getRecoveryUnit(opCtx),
minRecordId,
maxRecordId,
hintDataSizeIncrement,
hintNumRecordsIncrement);
}
_ru->setOperationContext(opCtx);
_ru->setIsolation(RecoveryUnit::Isolation::snapshot);
ON_BLOCK_EXIT([this] {
@ -387,10 +326,6 @@ Status SpillTable::rangeTruncate(OperationContext* opCtx,
}
std::unique_ptr<StorageStats> SpillTable::computeOperationStatisticsSinceLastCall() {
// TODO (SERVER-106716): Remove this case.
if (!_ru) {
return nullptr;
}
return _ru->computeOperationStatisticsSinceLastCall();
}

View File

@ -54,7 +54,7 @@ class SpillTable {
public:
class Cursor {
public:
Cursor(RecoveryUnit* ru, std::unique_ptr<SeekableRecordCursor> cursor);
Cursor(RecoveryUnit& ru, std::unique_ptr<SeekableRecordCursor> cursor);
boost::optional<Record> seekExact(const RecordId& id);
@ -66,20 +66,13 @@ public:
void save();
bool restore(RecoveryUnit& ru);
bool restore();
private:
RecoveryUnit* _ru; // TODO (SERVER-106716): Make this a reference.
RecoveryUnit& _ru;
std::unique_ptr<SeekableRecordCursor> _cursor;
};
/**
* Creates a spill table using the given recovery unit and record store.
*
* TODO (SERVER-106716): Remove this constructor.
*/
SpillTable(std::unique_ptr<RecoveryUnit> ru, std::unique_ptr<RecordStore> rs);
/**
* Creates a spill table using the given recovery unit and record store. If the available disk
* space falls below thresholdBytes, writes to the spill table will fail.
@ -107,16 +100,12 @@ public:
/**
* Returns the storage size on disk of the spill table.
*
* TODO (SERVER-106716): Remove the RecoveryUnit parameter.
*/
int64_t storageSize(RecoveryUnit& ru) const;
int64_t storageSize() const;
/**
* Inserts the specified records into the underlying RecordStore by copying the provided record
* data.
* When `featureFlagCreateSpillKVEngine` is enabled, this should not be explicitly called in a
* WriteUnitOfWork.
* data. This should not be explicitly called in a WriteUnitOfWork.
*/
Status insertRecords(OperationContext* opCtx, std::vector<Record>* records);
@ -130,16 +119,12 @@ public:
/**
* Updates the record with id 'rid', replacing its contents with those described by
* 'data' and 'len'.
* When `featureFlagCreateSpillKVEngine` is enabled, this should not be explicitly called in a
* WriteUnitOfWork.
* 'data' and 'len'. This should not be explicitly called in a WriteUnitOfWork.
*/
Status updateRecord(OperationContext* opCtx, const RecordId& rid, const char* data, int len);
/**
* Deletes the record with id 'rid'.
* When `featureFlagCreateSpillKVEngine` is enabled, this should not be explicitly called in a
* WriteUnitOfWork.
* Deletes the record with id 'rid'. This should not be explicitly called in a WriteUnitOfWork.
*/
void deleteRecord(OperationContext* opCtx, const RecordId& rid);
@ -152,9 +137,7 @@ public:
std::unique_ptr<Cursor> getCursor(OperationContext*, bool forward = true) const;
/**
* Removes all records.
* When `featureFlagCreateSpillKVEngine` is enabled, this should not be explicitly called in a
* WriteUnitOfWork.
* Removes all records. This should not be explicitly called in a WriteUnitOfWork.
*/
Status truncate(OperationContext* opCtx);
@ -162,9 +145,7 @@ public:
* Removes all records in the range [minRecordId, maxRecordId] inclusive of both. The hint*
* arguments serve as a hint to the record store of how much data will be truncated. This is
* necessary to avoid reading the data between the two RecordIds in order to update numRecords
* and dataSize correctly.
* When `featureFlagCreateSpillKVEngine` is enabled, this should not be explicitly called in a
* WriteUnitOfWork.
* and dataSize correctly. This should not be explicitly called in a WriteUnitOfWork.
*/
Status rangeTruncate(OperationContext* opCtx,
const RecordId& minRecordId = RecordId(),
@ -177,7 +158,7 @@ public:
protected:
std::unique_ptr<RecoveryUnit> _ru;
std::unique_ptr<RecordStore> _rs;
StorageEngine* _storageEngine{nullptr}; // TODO (SERVER-106716): Make this a reference.
StorageEngine& _storageEngine;
private:
Status _checkDiskSpace() const;

View File

@ -51,7 +51,6 @@ class SpillTableTest : public StorageEngineTest {
protected:
SpillTableTest()
: StorageEngineTest(StorageEngineTest::Options{}
.setParameter("featureFlagCreateSpillKVEngine", true)
.setParameter("spillWiredTigerCacheSizeMinMB", kCacheSizeMB)
.setParameter("spillWiredTigerCacheSizeMaxMB", kCacheSizeMB)) {}
};

View File

@ -295,11 +295,6 @@ feature_flags:
cpp_varname: feature_flags::gStorageEngineInterruptibility
default: true
fcv_gated: false
featureFlagCreateSpillKVEngine:
description: "When enabled, create a second storage engine instance that is used for creating SpillTables."
cpp_varname: feature_flags::gFeatureFlagCreateSpillKVEngine
default: true
fcv_gated: false
featureFlagReplicateLocalCatalogIdentifiers:
description: >-
Enable replication of local catalog identifiers across replica set nodes, ensuring the

View File

@ -37,13 +37,11 @@ namespace mongo {
/**
* Manages the lifetime of a temporary RecordStore. Unless keep() is called, the managed RecordStore
* will be dropped after destruction.
*
* TODO (SERVER-106716): Make this no longer derive from SpillTable.
*/
class TemporaryRecordStore : public SpillTable {
class TemporaryRecordStore {
public:
explicit TemporaryRecordStore(std::unique_ptr<RecordStore> rs)
: SpillTable(nullptr, std::move(rs)) {}
explicit TemporaryRecordStore(std::unique_ptr<RecordStore> rs) : _rs(std::move(rs)) {}
virtual ~TemporaryRecordStore() {}
// Not copyable.
TemporaryRecordStore(const TemporaryRecordStore&) = delete;
@ -66,6 +64,7 @@ public:
}
protected:
std::unique_ptr<RecordStore> _rs;
bool _keep = false;
};
} // namespace mongo

View File

@ -165,8 +165,6 @@ public:
kv->setRecordStoreExtraOptions(wiredTigerGlobalOptions.collectionConfig);
kv->setSortedDataInterfaceExtraOptions(wiredTigerGlobalOptions.indexConfig);
std::unique_ptr<SpillWiredTigerKVEngine> spillWiredTigerKVEngine;
if (feature_flags::gFeatureFlagCreateSpillKVEngine.isEnabled()) {
boost::system::error_code ec;
boost::filesystem::remove_all(params.getSpillDbPath(), ec);
if (ec) {
@ -175,13 +173,11 @@ public:
"error"_attr = ec.message());
}
WiredTigerKVEngineBase::WiredTigerConfig wtConfig =
getSpillWiredTigerConfigFromStartupOptions();
spillWiredTigerKVEngine = std::make_unique<SpillWiredTigerKVEngine>(
auto spillWiredTigerKVEngine = std::make_unique<SpillWiredTigerKVEngine>(
std::string{getCanonicalName()},
params.getSpillDbPath().string(),
&opCtx->fastClockSource(),
std::move(wtConfig),
getSpillWiredTigerConfigFromStartupOptions(),
SpillWiredTigerExtensions::get(opCtx->getServiceContext()));
std::call_once(spillWiredTigerServerStatusSectionFlag, [] {
@ -189,7 +185,6 @@ public:
std::string{SpillWiredTigerServerStatusSection::kServerStatusSectionName})
.forShard();
});
}
// We're using the WT engine; register the ServerStatusSection for it.
// Only do so once; even if we re-create the StorageEngine for FCBIS. The section is