mirror of https://github.com/mongodb/mongo
SERVER-101672 Remove async sampling feature flag (#40165)
Co-authored-by: Gregory Wlodarek <gregory.wlodarek@mongodb.com> GitOrigin-RevId: 1582c9d06839a77a6c240f9386e40b646c5d5a09
This commit is contained in:
parent
45366cd0d3
commit
8ed1a5d67b
|
|
@ -26,8 +26,6 @@
|
|||
# TODO SERVER-67034: re-enable 'featureFlagSbeFull'.
|
||||
- featureFlagSbeFull
|
||||
- featureFlagSessionsCollectionCoordinatorOnConfigServer
|
||||
# TODO (SERVER-101672) : Delete featureFlagOplogSamplingAsyncEnabled.
|
||||
- featureFlagOplogSamplingAsyncEnabled
|
||||
- featureFlagReplicateLocalCatalogIdentifiers
|
||||
# TODO (SERVER-108818): test primary-driven index builds in allFeatureFlag variant.
|
||||
- featureFlagPrimaryDrivenIndexBuilds
|
||||
|
|
|
|||
|
|
@ -1,112 +0,0 @@
|
|||
/**
|
||||
* Checks that the oplog sampling and initial marker creation does not block startup and can
|
||||
* successfully complete post-startup. This is done by purposefully hanging the oplog cap maintainer
|
||||
* thread to force it to finish marker creation until after startup has finished.
|
||||
* - readonly
|
||||
* - repair
|
||||
* - recoverFromOplogAsStandalone
|
||||
*
|
||||
* @tags: [requires_replication, requires_persistence]
|
||||
*/
|
||||
import {configureFailPoint, kDefaultWaitForFailPointTimeout} from "jstests/libs/fail_point_util.js";
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
|
||||
const oplogSizeMB = 1; // Replica set oplog size in MB
|
||||
const oplogSizeBytes = oplogSizeMB * 1024 * 1024;
|
||||
const minBytesPerMarker = 100 * 1024; // Minimum size per truncate marker (100 KB)
|
||||
const smallDocSize = 1 * 1024; // Small doc size (1 KB)
|
||||
const smallDoc = {
|
||||
payload: "a".repeat(smallDocSize),
|
||||
}; // Small data for insertion
|
||||
|
||||
const kMinSampleRatioForRandCursor = 20; // Minimum sample ratio for random cursors
|
||||
const kRandomSamplesPerMarker = 10; // Random samples drawn per marker
|
||||
const estimatedMarkers = Math.ceil(oplogSizeBytes / minBytesPerMarker);
|
||||
|
||||
/**
|
||||
* Adds documents to grow the oplog beyond the configured maximum size.
|
||||
*/
|
||||
function growOplog(replSet) {
|
||||
const primary = replSet.getPrimary();
|
||||
const oplog = primary.getDB("local").oplog.rs;
|
||||
const coll = primary.getDB("growOplogPastMaxSize").testCollection;
|
||||
|
||||
const initialOplogSize = oplog.dataSize();
|
||||
const initialRecordsCount = oplog.count();
|
||||
|
||||
assert.lte(initialOplogSize, oplogSizeBytes, "Initial oplog size exceeds expected maximum");
|
||||
jsTestLog(`Initial oplog size: ${initialOplogSize} bytes, records: ${initialRecordsCount}`);
|
||||
|
||||
jsTestLog(`RequiredNumRecords: ${requiredNumRecords}`);
|
||||
let insertedDocs = 0; // Track the number of inserted documents
|
||||
while (oplog.dataSize() <= 2 * oplogSizeBytes || insertedDocs < requiredNumRecords) {
|
||||
assert.commandWorked(coll.insert(smallDoc, {writeConcern: {w: 1}}));
|
||||
insertedDocs++;
|
||||
jsTestLog(`InsertedDocsNum: ${insertedDocs}`);
|
||||
}
|
||||
|
||||
const finalOplogSize = oplog.dataSize();
|
||||
const finalRecordsCount = oplog.count();
|
||||
|
||||
jsTestLog(`Final oplog size: ${finalOplogSize} bytes, records: ${finalRecordsCount}`);
|
||||
assert.gt(finalOplogSize, oplogSizeBytes, "Failed to grow oplog beyond the maximum size");
|
||||
assert.gte(finalRecordsCount, requiredNumRecords, "Failed to meet required number of records for sampling");
|
||||
}
|
||||
|
||||
// Minimum number of records required to trigger sampling.
|
||||
const requiredNumRecords = kMinSampleRatioForRandCursor * kRandomSamplesPerMarker * estimatedMarkers;
|
||||
|
||||
// Initialize replica set with a small oplog size
|
||||
const rst = new ReplSetTest({
|
||||
oplogSize: oplogSizeMB,
|
||||
nodes: 1,
|
||||
nodeOptions: {
|
||||
setParameter: {
|
||||
logComponentVerbosity: tojson({storage: 1}),
|
||||
minOplogTruncationPoints: 2,
|
||||
"failpoint.hangOplogCapMaintainerThread": tojson({mode: "alwaysOn"}),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
rst.startSet();
|
||||
rst.initiate();
|
||||
jsTestLog("Replica set initialized and starting single-node setup");
|
||||
|
||||
const primary = rst.getPrimary();
|
||||
|
||||
jsTestLog("Growing oplog to exceed its configured maximum size");
|
||||
growOplog(rst);
|
||||
|
||||
// Verify logs related to oplog maintenance and marker creation
|
||||
rst.stopSet(null, true); // Stop replica set for restart
|
||||
|
||||
jsTestLog("Restarting replica set");
|
||||
|
||||
rst.startSet(null, true);
|
||||
const restartedPrimary = rst.getPrimary();
|
||||
|
||||
checkLog.containsJson(restartedPrimary, 4615611); // Log ID: Starting up MongoDB
|
||||
|
||||
checkLog.containsJson(restartedPrimary, 5295000); // Log ID: OplogCapMaintainerThread started
|
||||
|
||||
// Verify that the oplog cap maintainer thread is paused.
|
||||
assert.commandWorked(
|
||||
restartedPrimary.adminCommand({
|
||||
waitForFailPoint: "hangOplogCapMaintainerThread",
|
||||
timesEntered: 1,
|
||||
maxTimeMS: kDefaultWaitForFailPointTimeout,
|
||||
}),
|
||||
);
|
||||
|
||||
// Verify that that our truncate marker creation can end post startup.
|
||||
checkLog.containsJson(restartedPrimary, 8423403); // Log ID: Start up finished
|
||||
|
||||
// Let the oplog cap maintainer thread finish creating truncate markers
|
||||
assert.commandWorked(restartedPrimary.adminCommand({configureFailPoint: "hangOplogCapMaintainerThread", mode: "off"}));
|
||||
|
||||
// Note that log ID 22382 should appear after log ID 8323403
|
||||
checkLog.containsJson(restartedPrimary, 22382); // Log ID: Oplog truncate markers calculated
|
||||
|
||||
jsTestLog("Test complete. Stopping replica set");
|
||||
rst.stopSet();
|
||||
|
|
@ -0,0 +1,115 @@
|
|||
/**
|
||||
* Checks that when async oplog sampling enabled, any data that is written in parallel with initial
|
||||
* sampling eventually gets truncated. This test also checks that the oplog sampling and initial
|
||||
* marker creation does not block startup and can successfully complete post-startup.
|
||||
*
|
||||
* @tags: [requires_replication, requires_persistence]
|
||||
*/
|
||||
import {kDefaultWaitForFailPointTimeout} from "jstests/libs/fail_point_util.js";
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
|
||||
// Constants for replica set and test configuration
|
||||
const oplogSizeMB = 1; // Small oplog size in MB
|
||||
const longString = "a".repeat(450 * 1024); // Large document size (~500KB)
|
||||
let nextId = 0; // Tracks the next `_id` for inserts
|
||||
|
||||
// Initialize a single-node replica set with a small oplog size
|
||||
const rst = new ReplSetTest({
|
||||
oplogSize: oplogSizeMB,
|
||||
nodes: 1,
|
||||
// Set the syncdelay to 1s to speed up checkpointing.
|
||||
nodeOptions: {
|
||||
syncdelay: 1,
|
||||
setParameter: {
|
||||
logComponentVerbosity: tojson({storage: 1}),
|
||||
minOplogTruncationPoints: 2,
|
||||
"failpoint.hangOplogCapMaintainerThread": tojson({mode: "alwaysOn"}),
|
||||
internalQueryExecYieldPeriodMS: 86400000, // Disable yielding
|
||||
},
|
||||
},
|
||||
});
|
||||
rst.startSet();
|
||||
rst.initiate();
|
||||
|
||||
// Insert initial documents
|
||||
jsTest.log.info("Inserting initial set of documents into the collection.");
|
||||
for (let i = 0; i < 10; i++) {
|
||||
rst.getPrimary().getDB("test").getCollection("markers").insert({_id: nextId++});
|
||||
}
|
||||
|
||||
// Stop and restart the replica set
|
||||
rst.stopSet(null, true);
|
||||
jsTest.log.info("Replica set stopped for restart.");
|
||||
clearRawMongoProgramOutput();
|
||||
|
||||
rst.startSet(null, true); // Restart replica set
|
||||
const restartedPrimary = rst.getPrimary();
|
||||
const restartedPrimaryOplog = restartedPrimary.getDB("local").getCollection("oplog.rs");
|
||||
jsTest.log.info("Replica set restarted.");
|
||||
|
||||
// // Verify that the oplog cap maintainer thread is paused.
|
||||
assert.commandWorked(
|
||||
restartedPrimary.adminCommand({
|
||||
waitForFailPoint: "hangOplogCapMaintainerThread",
|
||||
timesEntered: 1,
|
||||
maxTimeMS: kDefaultWaitForFailPointTimeout,
|
||||
}),
|
||||
);
|
||||
|
||||
// Test inserts while truncate marker creation process is paused
|
||||
jsTest.log.info("Testing oplog truncation logic with new inserts...");
|
||||
const coll = restartedPrimary.getDB("test").markers;
|
||||
const largeDocIDs = [nextId++, nextId++];
|
||||
|
||||
// Insert large documents
|
||||
const firstInsertTimestamp = assert.commandWorked(
|
||||
coll.runCommand("insert", {documents: [{_id: largeDocIDs[0], longString: longString}]}),
|
||||
).operationTime;
|
||||
jsTest.log.info("First insert timestamp: " + tojson(firstInsertTimestamp));
|
||||
|
||||
const secondInsertTimestamp = assert.commandWorked(
|
||||
coll.runCommand("insert", {documents: [{_id: largeDocIDs[1], longString: longString}]}),
|
||||
).operationTime;
|
||||
jsTest.log.info("Second insert timestamp: " + tojson(secondInsertTimestamp));
|
||||
|
||||
// Take a checkpoint
|
||||
restartedPrimary.getDB("admin").runCommand({fsync: 1});
|
||||
|
||||
// Verify truncate marker creation resumes post-startup
|
||||
checkLog.containsJson(restartedPrimary, 8423403); // Log ID for startup finished
|
||||
|
||||
// Fetch server status and verify truncation metrics
|
||||
let serverStatus = restartedPrimary.getDB("admin").runCommand({serverStatus: 1});
|
||||
const truncationCount = serverStatus.oplogTruncation.truncateCount;
|
||||
|
||||
// Resume oplog truncate marker creation
|
||||
jsTest.log.info("Resuming oplog truncate marker creation.");
|
||||
assert.commandWorked(restartedPrimary.adminCommand({configureFailPoint: "hangOplogCapMaintainerThread", mode: "off"}));
|
||||
|
||||
// Verify truncate markers are created and logged
|
||||
checkLog.containsJson(restartedPrimary, 22382); // Log ID: Oplog truncate markers calculated
|
||||
|
||||
// Insert additional records to trigger truncation
|
||||
for (let i = 0; i < 50; i++) {
|
||||
coll.insert({_id: nextId++, longString: longString});
|
||||
}
|
||||
restartedPrimary.getDB("admin").runCommand({fsync: 1});
|
||||
|
||||
// Wait for truncation to occur
|
||||
assert.soon(() => {
|
||||
serverStatus = restartedPrimary.getDB("admin").runCommand({serverStatus: 1});
|
||||
return serverStatus.oplogTruncation.truncateCount > truncationCount;
|
||||
});
|
||||
|
||||
// Verify large documents were truncated from the oplog
|
||||
const cursor = restartedPrimaryOplog.find({ns: "test.markers"});
|
||||
while (cursor.hasNext()) {
|
||||
const entry = cursor.next();
|
||||
jsTest.log.info("Checking " + tojson(entry));
|
||||
largeDocIDs.forEach((id) => {
|
||||
assert.neq(id, entry.o["_id"], "Unexpected _id entry in oplog.");
|
||||
});
|
||||
}
|
||||
|
||||
jsTest.log.info("Test complete. Stopping replica set.");
|
||||
rst.stopSet();
|
||||
|
|
@ -39,6 +39,7 @@
|
|||
#include "mongo/db/query/plan_yield_policy.h"
|
||||
#include "mongo/db/query/record_id_bound.h"
|
||||
#include "mongo/db/repl/storage_interface.h"
|
||||
#include "mongo/db/storage/oplog_truncate_marker_parameters_gen.h"
|
||||
#include "mongo/db/storage/record_data.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
|
@ -276,7 +277,7 @@ void PreImagesTruncateMarkersPerNsUUID::updateMarkers(int64_t numBytes,
|
|||
RecordId recordId,
|
||||
Date_t wallTime,
|
||||
int64_t numRecords) {
|
||||
updateCurrentMarker(numBytes, recordId, wallTime, numRecords);
|
||||
updateCurrentMarker(numBytes, recordId, wallTime, numRecords, gOplogSamplingAsyncEnabled);
|
||||
}
|
||||
|
||||
bool PreImagesTruncateMarkersPerNsUUID::_hasExcessMarkers(OperationContext* opCtx) const {
|
||||
|
|
|
|||
|
|
@ -784,6 +784,11 @@ public:
|
|||
// Tests a pre-image is tracked after 'refreshHighestTrackedRecord()' and the pre-image is
|
||||
// expirable.
|
||||
void testExpiryAfterrefreshHighestTrackedRecord() {
|
||||
|
||||
// Turn off async mode
|
||||
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
"oplogSamplingAsyncEnabled", false);
|
||||
|
||||
const auto expireAfterSeconds = getExpireAfterSeconds();
|
||||
createPreImagesCollection(opCtx());
|
||||
|
||||
|
|
@ -819,6 +824,10 @@ public:
|
|||
}
|
||||
|
||||
void testExpiryOneRecordOneWholeMarker() {
|
||||
// Turn off async mode
|
||||
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
"oplogSamplingAsyncEnabled", false);
|
||||
|
||||
const auto expireAfterSeconds = getExpireAfterSeconds();
|
||||
|
||||
// A 'minBytesPerMarker' smaller than 'kPreImage1' so the insertion of 'kPreImage1'
|
||||
|
|
|
|||
|
|
@ -115,6 +115,7 @@
|
|||
#include "mongo/db/stats/server_write_concern_metrics.h"
|
||||
#include "mongo/db/storage/ident.h"
|
||||
#include "mongo/db/storage/kv/kv_engine.h"
|
||||
#include "mongo/db/storage/oplog_truncate_marker_parameters_gen.h"
|
||||
#include "mongo/db/storage/record_data.h"
|
||||
#include "mongo/db/storage/recovery_unit.h"
|
||||
#include "mongo/db/storage/storage_engine.h"
|
||||
|
|
@ -288,8 +289,13 @@ Status insertDocumentsForOplog(OperationContext* opCtx,
|
|||
return ele.Date();
|
||||
}
|
||||
}();
|
||||
truncateMarkers->updateCurrentMarkerAfterInsertOnCommit(
|
||||
opCtx, totalLength, (*records)[nRecords - 1].id, wall, nRecords);
|
||||
|
||||
truncateMarkers->updateCurrentMarkerAfterInsertOnCommit(opCtx,
|
||||
totalLength,
|
||||
(*records)[nRecords - 1].id,
|
||||
wall,
|
||||
nRecords,
|
||||
gOplogSamplingAsyncEnabled);
|
||||
}
|
||||
|
||||
// We do not need to notify capped waiters, as we have not yet updated oplog visibility, so
|
||||
|
|
|
|||
|
|
@ -98,7 +98,8 @@ CollectionTruncateMarkers::Marker& CollectionTruncateMarkers::createNewMarker(
|
|||
}
|
||||
|
||||
void CollectionTruncateMarkers::createNewMarkerIfNeeded(const RecordId& lastRecord,
|
||||
Date_t wallTime) {
|
||||
Date_t wallTime,
|
||||
bool oplogSamplingAsyncEnabled) {
|
||||
auto logFailedLockAcquisition = [&](const std::string& lock) {
|
||||
LOGV2_DEBUG(7393214,
|
||||
2,
|
||||
|
|
@ -115,7 +116,7 @@ void CollectionTruncateMarkers::createNewMarkerIfNeeded(const RecordId& lastReco
|
|||
return;
|
||||
}
|
||||
|
||||
if (feature_flags::gOplogSamplingAsyncEnabled.isEnabled() && !_initialSamplingFinished) {
|
||||
if (oplogSamplingAsyncEnabled && !_initialSamplingFinished) {
|
||||
// Must have finished creating initial markers first.
|
||||
return;
|
||||
}
|
||||
|
|
@ -149,24 +150,28 @@ void CollectionTruncateMarkers::updateCurrentMarkerAfterInsertOnCommit(
|
|||
int64_t bytesInserted,
|
||||
const RecordId& highestInsertedRecordId,
|
||||
Date_t wallTime,
|
||||
int64_t countInserted) {
|
||||
int64_t countInserted,
|
||||
bool oplogSamplingAsyncEnabled) {
|
||||
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
|
||||
[collectionMarkers = shared_from_this(),
|
||||
bytesInserted,
|
||||
recordId = highestInsertedRecordId,
|
||||
wallTime,
|
||||
countInserted](OperationContext* opCtx, auto) {
|
||||
countInserted,
|
||||
oplogSamplingAsyncEnabled](OperationContext* opCtx, auto) {
|
||||
invariant(bytesInserted >= 0);
|
||||
invariant(recordId.isValid());
|
||||
|
||||
collectionMarkers->_currentRecords.addAndFetch(countInserted);
|
||||
int64_t newCurrentBytes = collectionMarkers->_currentBytes.addAndFetch(bytesInserted);
|
||||
|
||||
if (wallTime != Date_t() &&
|
||||
newCurrentBytes >= collectionMarkers->_minBytesPerMarker.load()) {
|
||||
// When other transactions commit concurrently, an uninitialized wallTime may delay
|
||||
// the creation of a new marker. This delay is limited to the number of concurrently
|
||||
// running transactions, so the size difference should be inconsequential.
|
||||
collectionMarkers->createNewMarkerIfNeeded(recordId, wallTime);
|
||||
collectionMarkers->createNewMarkerIfNeeded(
|
||||
recordId, wallTime, oplogSamplingAsyncEnabled);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
@ -479,7 +484,8 @@ void CollectionTruncateMarkersWithPartialExpiration::updateCurrentMarkerAfterIns
|
|||
int64_t bytesInserted,
|
||||
const RecordId& highestInsertedRecordId,
|
||||
Date_t wallTime,
|
||||
int64_t countInserted) {
|
||||
int64_t countInserted,
|
||||
bool oplogSamplingAsyncEnabled) {
|
||||
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
|
||||
[collectionMarkers =
|
||||
std::static_pointer_cast<CollectionTruncateMarkersWithPartialExpiration>(
|
||||
|
|
@ -487,11 +493,12 @@ void CollectionTruncateMarkersWithPartialExpiration::updateCurrentMarkerAfterIns
|
|||
bytesInserted,
|
||||
recordId = highestInsertedRecordId,
|
||||
wallTime,
|
||||
countInserted](OperationContext* opCtx, auto) {
|
||||
countInserted,
|
||||
oplogSamplingAsyncEnabled](OperationContext* opCtx, auto) {
|
||||
invariant(bytesInserted >= 0);
|
||||
invariant(recordId.isValid());
|
||||
collectionMarkers->updateCurrentMarker(
|
||||
bytesInserted, recordId, wallTime, countInserted);
|
||||
bytesInserted, recordId, wallTime, countInserted, oplogSamplingAsyncEnabled);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -553,7 +560,8 @@ void CollectionTruncateMarkersWithPartialExpiration::updateCurrentMarker(
|
|||
int64_t bytesAdded,
|
||||
const RecordId& highestRecordId,
|
||||
Date_t highestWallTime,
|
||||
int64_t numRecordsAdded) {
|
||||
int64_t numRecordsAdded,
|
||||
bool oplogSamplingAsyncEnabled) {
|
||||
// By putting the highest marker modification first we can guarantee than in the
|
||||
// event of a race condition between expiring a partial marker the metrics increase
|
||||
// will happen after the marker has been created. This guarantees that the metrics
|
||||
|
|
@ -564,7 +572,7 @@ void CollectionTruncateMarkersWithPartialExpiration::updateCurrentMarker(
|
|||
int64_t newCurrentBytes = _currentBytes.addAndFetch(bytesAdded);
|
||||
if (highestWallTime != Date_t() && highestRecordId.isValid() &&
|
||||
newCurrentBytes >= _minBytesPerMarker.load()) {
|
||||
createNewMarkerIfNeeded(highestRecordId, highestWallTime);
|
||||
createNewMarkerIfNeeded(highestRecordId, highestWallTime, oplogSamplingAsyncEnabled);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -129,14 +129,17 @@ public:
|
|||
|
||||
void popOldestMarker();
|
||||
|
||||
void createNewMarkerIfNeeded(const RecordId& lastRecord, Date_t wallTime);
|
||||
void createNewMarkerIfNeeded(const RecordId& lastRecord,
|
||||
Date_t wallTime,
|
||||
bool oplogSamplingAsyncEnabled);
|
||||
|
||||
// Updates the current marker with the inserted value if the operation commits the WUOW.
|
||||
virtual void updateCurrentMarkerAfterInsertOnCommit(OperationContext* opCtx,
|
||||
int64_t bytesInserted,
|
||||
const RecordId& highestInsertedRecordId,
|
||||
Date_t wallTime,
|
||||
int64_t countInserted);
|
||||
int64_t countInserted,
|
||||
bool oplogSamplingAsyncEnabled);
|
||||
|
||||
/**
|
||||
* Waits for expired markers. See _hasExcessMarkers().
|
||||
|
|
@ -418,7 +421,8 @@ public:
|
|||
int64_t bytesInserted,
|
||||
const RecordId& highestInsertedRecordId,
|
||||
Date_t wallTime,
|
||||
int64_t countInserted) final;
|
||||
int64_t countInserted,
|
||||
bool oplogSamplingAsyncEnabled) final;
|
||||
|
||||
std::pair<const RecordId&, const Date_t&> getHighestRecordMetrics_forTest() const {
|
||||
return {_highestRecordId, _highestWallTime};
|
||||
|
|
@ -453,7 +457,8 @@ protected:
|
|||
void updateCurrentMarker(int64_t bytesAdded,
|
||||
const RecordId& highestRecordId,
|
||||
Date_t highestWallTime,
|
||||
int64_t numRecordsAdded);
|
||||
int64_t numRecordsAdded,
|
||||
bool oplogSamplingAsyncEnabled);
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -80,8 +80,12 @@ public:
|
|||
ts);
|
||||
ASSERT_OK(recordIdStatus);
|
||||
auto recordId = recordIdStatus.getValue();
|
||||
testMarkers.updateCurrentMarkerAfterInsertOnCommit(
|
||||
opCtx, insertedData.length(), recordId, now, 1);
|
||||
testMarkers.updateCurrentMarkerAfterInsertOnCommit(opCtx,
|
||||
insertedData.length(),
|
||||
recordId,
|
||||
now,
|
||||
1,
|
||||
/*oplogSamplingAsyncEnabled*/ false);
|
||||
records.push_back(RecordIdAndWall{std::move(recordId), std::move(now)});
|
||||
}
|
||||
wuow.commit();
|
||||
|
|
@ -117,7 +121,7 @@ public:
|
|||
ASSERT_EQ(recordIdStatus.getValue(), recordId);
|
||||
auto now = Date_t::fromMillisSinceEpoch(timestampToUse.asInt64());
|
||||
testMarkers.updateCurrentMarkerAfterInsertOnCommit(
|
||||
opCtx, insertedData.length(), recordId, now, 1);
|
||||
opCtx, insertedData.length(), recordId, now, 1, /*oplogSamplingAsyncEnabled*/ false);
|
||||
wuow.commit();
|
||||
return recordId;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -132,125 +132,123 @@ private:
|
|||
StorageInterfaceImpl _storage;
|
||||
};
|
||||
|
||||
// TODO SERVER-101672 Re-enable these tests.
|
||||
|
||||
// In async mode, sampleAndUpdate is called seperately from createOplogTruncateMarkers, and
|
||||
// creates the initial set of markers.
|
||||
TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeSampleAndUpdate) {
|
||||
// // Turn off async mode
|
||||
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
// "oplogSamplingAsyncEnabled", true);
|
||||
// auto opCtx = getOperationContext();
|
||||
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
||||
// Turn on async mode
|
||||
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
"oplogSamplingAsyncEnabled", true);
|
||||
auto opCtx = getOperationContext();
|
||||
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
||||
|
||||
// // Populate oplog to force marker creation to occur
|
||||
// int realNumRecords = 4;
|
||||
// int realSizePerRecord = 1024 * 1024;
|
||||
// for (int i = 1; i <= realNumRecords; i++) {
|
||||
// insertOplog(i, realSizePerRecord);
|
||||
// }
|
||||
// Populate oplog to force marker creation to occur
|
||||
int realNumRecords = 4;
|
||||
int realSizePerRecord = 1024 * 1024;
|
||||
for (int i = 1; i <= realNumRecords; i++) {
|
||||
insertOplog(i, realSizePerRecord);
|
||||
}
|
||||
|
||||
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
||||
// ASSERT(oplogTruncateMarkers);
|
||||
auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
||||
ASSERT(oplogTruncateMarkers);
|
||||
|
||||
// ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
|
||||
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
|
||||
|
||||
// // Continue finishing the initial scan / sample
|
||||
// oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
|
||||
// Continue finishing the initial scan / sample
|
||||
oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
|
||||
|
||||
// // Confirm that some truncate markers were generated.
|
||||
// ASSERT_LT(0U, oplogTruncateMarkers->numMarkers_forTest());
|
||||
// } // namespace repl
|
||||
// Confirm that some truncate markers were generated.
|
||||
ASSERT_LT(0U, oplogTruncateMarkers->numMarkers_forTest());
|
||||
} // namespace repl
|
||||
|
||||
// In async mode, during startup but before sampling finishes, creation method is InProgress.
|
||||
// This should then resolve to either the Scanning or Sampling method once initial marker
|
||||
// creation has finished TEST_F(AsyncOplogTruncationTest,
|
||||
// OplogTruncateMarkers_AsynchronousModeInProgressState) { Turn off async mode
|
||||
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
// "oplogSamplingAsyncEnabled", true);
|
||||
// auto opCtx = getOperationContext();
|
||||
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
||||
// In async mode, during startup but before sampling finishes,
|
||||
// creation method is InProgress.This should then resolve to either the Scanning or
|
||||
// Sampling method once initial marker creation has finished
|
||||
TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeInProgressState) {
|
||||
// Turn on async mode
|
||||
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
"oplogSamplingAsyncEnabled", true);
|
||||
auto opCtx = getOperationContext();
|
||||
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
||||
|
||||
// // Populate oplog to so that initial marker creation method is not EmptyCollection
|
||||
// insertOplog(1, 100);
|
||||
// Populate oplog to so that initial marker creation method is not EmptyCollection
|
||||
insertOplog(1, 100);
|
||||
|
||||
// // Note if in async mode, at this point we have not yet sampled.
|
||||
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
||||
// ASSERT(oplogTruncateMarkers);
|
||||
// Note if in async mode, at this point we have not yet sampled.
|
||||
auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
||||
ASSERT(oplogTruncateMarkers);
|
||||
|
||||
// // Confirm that we are in InProgress state since sampling/scanning has not begun.
|
||||
// ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::InProgress,
|
||||
// oplogTruncateMarkers->getMarkersCreationMethod());
|
||||
// Confirm that we are in InProgress state since sampling/scanning has not begun.
|
||||
ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::InProgress,
|
||||
oplogTruncateMarkers->getMarkersCreationMethod());
|
||||
|
||||
// // Continue finishing the initial scan / sample
|
||||
// oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
|
||||
// Continue finishing the initial scan / sample
|
||||
oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
|
||||
|
||||
// // Check that the InProgress state has now been resolved.
|
||||
// ASSERT(oplogTruncateMarkers->getMarkersCreationMethod() ==
|
||||
// CollectionTruncateMarkers::MarkersCreationMethod::Scanning);
|
||||
// Check that the InProgress state has now been resolved.
|
||||
ASSERT(oplogTruncateMarkers->getMarkersCreationMethod() ==
|
||||
CollectionTruncateMarkers::MarkersCreationMethod::Scanning);
|
||||
}
|
||||
|
||||
// In async mode, we are still able to sample when expected, and some markers can be created.
|
||||
TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeSampling) {
|
||||
// Turn off async mode
|
||||
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
// "oplogSamplingAsyncEnabled", true);
|
||||
// auto opCtx = getOperationContext();
|
||||
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
||||
// auto wtRS = static_cast<WiredTigerRecordStore::Oplog*>(rs);
|
||||
// Turn on async mode
|
||||
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
"oplogSamplingAsyncEnabled", true);
|
||||
auto opCtx = getOperationContext();
|
||||
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
||||
auto wtRS = static_cast<WiredTigerRecordStore::Oplog*>(rs);
|
||||
|
||||
// {
|
||||
// // Before initializing the RecordStore, populate with a few records.
|
||||
// insertOplog(1, 100);
|
||||
// insertOplog(2, 100);
|
||||
// insertOplog(3, 100);
|
||||
// insertOplog(4, 100);
|
||||
// }
|
||||
{
|
||||
// Before initializing the RecordStore, populate with a few records.
|
||||
insertOplog(1, 100);
|
||||
insertOplog(2, 100);
|
||||
insertOplog(3, 100);
|
||||
insertOplog(4, 100);
|
||||
}
|
||||
|
||||
// {
|
||||
// // Force initialize the oplog truncate markers to use sampling by providing very
|
||||
// large,
|
||||
// // inaccurate sizes. This should cause us to over sample the records in the oplog.
|
||||
// ASSERT_OK(wtRS->updateSize(1024 * 1024 * 1024));
|
||||
// wtRS->setNumRecords(1024 * 1024);
|
||||
// wtRS->setDataSize(1024 * 1024 * 1024);
|
||||
// }
|
||||
{
|
||||
// Force initialize the oplog truncate markers to use sampling by providing very large,
|
||||
// inaccurate sizes. This should cause us to over sample the records in the oplog.
|
||||
ASSERT_OK(wtRS->updateSize(1024 * 1024 * 1024));
|
||||
wtRS->setNumRecords(1024 * 1024);
|
||||
wtRS->setDataSize(1024 * 1024 * 1024);
|
||||
}
|
||||
|
||||
// LocalOplogInfo::get(opCtx)->setRecordStore(opCtx, rs);
|
||||
// // Note if in async mode, at this point we have not yet sampled.
|
||||
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
||||
// ASSERT(oplogTruncateMarkers);
|
||||
LocalOplogInfo::get(opCtx)->setRecordStore(opCtx, rs);
|
||||
// Note if in async mode, at this point we have not yet sampled.
|
||||
auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
||||
ASSERT(oplogTruncateMarkers);
|
||||
|
||||
// // Continue finishing the initial scan / sample
|
||||
// oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
|
||||
// ASSERT(oplogTruncateMarkers);
|
||||
// Continue finishing the initial scan / sample
|
||||
oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
|
||||
ASSERT(oplogTruncateMarkers);
|
||||
|
||||
// // Confirm that we can in fact sample
|
||||
// ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::Sampling,
|
||||
// oplogTruncateMarkers->getMarkersCreationMethod());
|
||||
// // Confirm that some truncate markers were generated.
|
||||
// ASSERT_GTE(oplogTruncateMarkers->getCreationProcessingTime().count(), 0);
|
||||
// auto truncateMarkersBefore = oplogTruncateMarkers->numMarkers_forTest();
|
||||
// ASSERT_GT(truncateMarkersBefore, 0U);
|
||||
// ASSERT_GT(oplogTruncateMarkers->currentBytes_forTest(), 0);
|
||||
// Confirm that we can in fact sample
|
||||
ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::Sampling,
|
||||
oplogTruncateMarkers->getMarkersCreationMethod());
|
||||
// Confirm that some truncate markers were generated.
|
||||
ASSERT_GTE(oplogTruncateMarkers->getCreationProcessingTime().count(), 0);
|
||||
auto truncateMarkersBefore = oplogTruncateMarkers->numMarkers_forTest();
|
||||
ASSERT_GT(truncateMarkersBefore, 0U);
|
||||
ASSERT_GT(oplogTruncateMarkers->currentBytes_forTest(), 0);
|
||||
}
|
||||
|
||||
// In async mode, markers are not created during createOplogTruncateMarkers (which instead
|
||||
// returns empty OplogTruncateMarkers object)
|
||||
TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeCreateOplogTruncateMarkers) {
|
||||
// Turn off async mode
|
||||
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
// "oplogSamplingAsyncEnabled", true);
|
||||
// auto opCtx = getOperationContext();
|
||||
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
||||
// Turn on async mode
|
||||
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
"oplogSamplingAsyncEnabled", true);
|
||||
auto opCtx = getOperationContext();
|
||||
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
||||
|
||||
// // Note if in async mode, at this point we have not yet sampled.
|
||||
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
||||
// ASSERT(oplogTruncateMarkers);
|
||||
// Note if in async mode, at this point we have not yet sampled.
|
||||
auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
||||
ASSERT(oplogTruncateMarkers);
|
||||
|
||||
// ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
|
||||
// ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
|
||||
// ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
|
||||
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
|
||||
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
|
||||
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
|
||||
}
|
||||
|
||||
} // namespace repl
|
||||
|
|
|
|||
|
|
@ -122,12 +122,10 @@ std::shared_ptr<OplogTruncateMarkers> OplogTruncateMarkers::sampleAndUpdate(Oper
|
|||
|
||||
std::shared_ptr<OplogTruncateMarkers> OplogTruncateMarkers::createOplogTruncateMarkers(
|
||||
OperationContext* opCtx, RecordStore& rs) {
|
||||
bool samplingAsynchronously =
|
||||
feature_flags::gOplogSamplingAsyncEnabled.isEnabled() && gOplogSamplingAsyncEnabled;
|
||||
LOGV2(10621000,
|
||||
"Creating oplog markers",
|
||||
"sampling asynchronously"_attr = samplingAsynchronously);
|
||||
if (!samplingAsynchronously) {
|
||||
"sampling asynchronously"_attr = gOplogSamplingAsyncEnabled);
|
||||
if (!gOplogSamplingAsyncEnabled) {
|
||||
return sampleAndUpdate(opCtx, rs);
|
||||
}
|
||||
return createEmptyOplogTruncateMarkers(rs);
|
||||
|
|
|
|||
|
|
@ -161,7 +161,6 @@ private:
|
|||
* Insert records into an oplog and verify the number of truncate markers that are created.
|
||||
*/
|
||||
TEST_F(OplogTruncationTest, OplogTruncateMarkers_CreateNewMarker) {
|
||||
|
||||
// Turn off async mode
|
||||
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
||||
"oplogSamplingAsyncEnabled", false);
|
||||
|
|
|
|||
|
|
@ -307,11 +307,6 @@ feature_flags:
|
|||
cpp_varname: feature_flags::gFeatureFlagReplicateLocalCatalogIdentifiers
|
||||
default: false
|
||||
fcv_gated: true
|
||||
featureFlagOplogSamplingAsyncEnabled:
|
||||
description: "Enable oplog sampling to run asynchronously to startup on the OplogCapMaintainerThread"
|
||||
cpp_varname: feature_flags::gOplogSamplingAsyncEnabled
|
||||
default: false
|
||||
fcv_gated: false
|
||||
featureFlagPrimaryDrivenIndexBuilds:
|
||||
description: "Enable primary-driven index builds."
|
||||
cpp_varname: feature_flags::gFeatureFlagPrimaryDrivenIndexBuilds
|
||||
|
|
|
|||
Loading…
Reference in New Issue