mirror of https://github.com/mongodb/mongo
SERVER-101672 Remove async sampling feature flag
GitOrigin-RevId: d910b8a9b5fa280de41dc16f0c4bc64f5105339a
This commit is contained in:
parent
8aa607b84f
commit
c17e543038
|
|
@ -16,5 +16,3 @@
|
||||||
- featureFlagReplicaSetEndpoint
|
- featureFlagReplicaSetEndpoint
|
||||||
- featureFlagCreateCollectionInPreparedTransactions
|
- featureFlagCreateCollectionInPreparedTransactions
|
||||||
- featureFlagMongodProxyProcolSupport
|
- featureFlagMongodProxyProcolSupport
|
||||||
# TODO (SERVER-101672) : Delete featureFlagOplogSamplingAsyncEnabled.
|
|
||||||
- featureFlagOplogSamplingAsyncEnabled
|
|
||||||
|
|
|
||||||
|
|
@ -1,113 +0,0 @@
|
||||||
/**
|
|
||||||
* Checks that the oplog sampling and initial marker creation does not block startup and can
|
|
||||||
* successfully complete post-startup. This is done by purposefully hanging the oplog cap maintainer
|
|
||||||
* thread to force it to finish marker creation until after startup has finished.
|
|
||||||
* - readonly
|
|
||||||
* - repair
|
|
||||||
* - recoverFromOplogAsStandalone
|
|
||||||
*
|
|
||||||
* @tags: [requires_replication, requires_persistence]
|
|
||||||
*/
|
|
||||||
import {configureFailPoint, kDefaultWaitForFailPointTimeout} from "jstests/libs/fail_point_util.js";
|
|
||||||
|
|
||||||
const oplogSizeMB = 1; // Replica set oplog size in MB
|
|
||||||
const oplogSizeBytes = oplogSizeMB * 1024 * 1024;
|
|
||||||
const minBytesPerMarker = 100 * 1024; // Minimum size per truncate marker (100 KB)
|
|
||||||
const smallDocSize = 1 * 1024; // Small doc size (1 KB)
|
|
||||||
const smallDoc = {
|
|
||||||
payload: "a".repeat(smallDocSize)
|
|
||||||
}; // Small data for insertion
|
|
||||||
|
|
||||||
const kMinSampleRatioForRandCursor = 20; // Minimum sample ratio for random cursors
|
|
||||||
const kRandomSamplesPerMarker = 10; // Random samples drawn per marker
|
|
||||||
const estimatedMarkers = Math.ceil(oplogSizeBytes / minBytesPerMarker);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds documents to grow the oplog beyond the configured maximum size.
|
|
||||||
*/
|
|
||||||
function growOplog(replSet) {
|
|
||||||
const primary = replSet.getPrimary();
|
|
||||||
const oplog = primary.getDB("local").oplog.rs;
|
|
||||||
const coll = primary.getDB("growOplogPastMaxSize").testCollection;
|
|
||||||
|
|
||||||
const initialOplogSize = oplog.dataSize();
|
|
||||||
const initialRecordsCount = oplog.count();
|
|
||||||
|
|
||||||
assert.lte(initialOplogSize, oplogSizeBytes, "Initial oplog size exceeds expected maximum");
|
|
||||||
jsTestLog(`Initial oplog size: ${initialOplogSize} bytes, records: ${initialRecordsCount}`);
|
|
||||||
|
|
||||||
jsTestLog(`RequiredNumRecords: ${requiredNumRecords}`);
|
|
||||||
let insertedDocs = 0; // Track the number of inserted documents
|
|
||||||
while (oplog.dataSize() <= 2 * oplogSizeBytes || insertedDocs < requiredNumRecords) {
|
|
||||||
assert.commandWorked(coll.insert(smallDoc, {writeConcern: {w: 1}}));
|
|
||||||
insertedDocs++;
|
|
||||||
jsTestLog(`InsertedDocsNum: ${insertedDocs}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
const finalOplogSize = oplog.dataSize();
|
|
||||||
const finalRecordsCount = oplog.count();
|
|
||||||
|
|
||||||
jsTestLog(`Final oplog size: ${finalOplogSize} bytes, records: ${finalRecordsCount}`);
|
|
||||||
assert.gt(finalOplogSize, oplogSizeBytes, "Failed to grow oplog beyond the maximum size");
|
|
||||||
assert.gte(finalRecordsCount,
|
|
||||||
requiredNumRecords,
|
|
||||||
"Failed to meet required number of records for sampling");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Minimum number of records required to trigger sampling.
|
|
||||||
const requiredNumRecords =
|
|
||||||
kMinSampleRatioForRandCursor * kRandomSamplesPerMarker * estimatedMarkers;
|
|
||||||
|
|
||||||
// Initialize replica set with a small oplog size
|
|
||||||
const rst = new ReplSetTest({
|
|
||||||
oplogSize: oplogSizeMB,
|
|
||||||
nodes: 1,
|
|
||||||
nodeOptions: {
|
|
||||||
setParameter: {
|
|
||||||
logComponentVerbosity: tojson({storage: 1}),
|
|
||||||
minOplogTruncationPoints: 2,
|
|
||||||
'failpoint.hangOplogCapMaintainerThread': tojson({mode: 'alwaysOn'})
|
|
||||||
},
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
rst.startSet();
|
|
||||||
rst.initiate();
|
|
||||||
jsTestLog("Replica set initialized and starting single-node setup");
|
|
||||||
|
|
||||||
const primary = rst.getPrimary();
|
|
||||||
|
|
||||||
jsTestLog("Growing oplog to exceed its configured maximum size");
|
|
||||||
growOplog(rst);
|
|
||||||
|
|
||||||
// Verify logs related to oplog maintenance and marker creation
|
|
||||||
rst.stopSet(null, true); // Stop replica set for restart
|
|
||||||
|
|
||||||
jsTestLog("Restarting replica set");
|
|
||||||
|
|
||||||
rst.startSet(null, true);
|
|
||||||
const restartedPrimary = rst.getPrimary();
|
|
||||||
|
|
||||||
checkLog.containsJson(restartedPrimary, 4615611); // Log ID: Starting up MongoDB
|
|
||||||
|
|
||||||
checkLog.containsJson(restartedPrimary, 5295000); // Log ID: OplogCapMaintainerThread started
|
|
||||||
|
|
||||||
// Verify that the oplog cap maintainer thread is paused.
|
|
||||||
assert.commandWorked(restartedPrimary.adminCommand({
|
|
||||||
waitForFailPoint: "hangOplogCapMaintainerThread",
|
|
||||||
timesEntered: 1,
|
|
||||||
maxTimeMS: kDefaultWaitForFailPointTimeout
|
|
||||||
}));
|
|
||||||
|
|
||||||
// Verify that that our truncate marker creation can end post startup.
|
|
||||||
checkLog.containsJson(restartedPrimary, 8423403); // Log ID: Start up finished
|
|
||||||
|
|
||||||
// Let the oplog cap maintainer thread finish creating truncate markers
|
|
||||||
assert.commandWorked(restartedPrimary.adminCommand(
|
|
||||||
{configureFailPoint: "hangOplogCapMaintainerThread", mode: "off"}));
|
|
||||||
|
|
||||||
// Note that log ID 22382 should appear after log ID 8323403
|
|
||||||
checkLog.containsJson(restartedPrimary, 22382); // Log ID: Oplog truncate markers calculated
|
|
||||||
|
|
||||||
jsTestLog("Test complete. Stopping replica set");
|
|
||||||
rst.stopSet();
|
|
||||||
|
|
@ -0,0 +1,122 @@
|
||||||
|
/**
|
||||||
|
* Checks that when async oplog sampling enabled, any data that is written in parallel with initial
|
||||||
|
* sampling eventually gets truncated. This test also checks that the oplog sampling and initial
|
||||||
|
* marker creation does not block startup and can successfully complete post-startup.
|
||||||
|
*
|
||||||
|
* @tags: [requires_replication, requires_persistence]
|
||||||
|
*/
|
||||||
|
import {kDefaultWaitForFailPointTimeout} from "jstests/libs/fail_point_util.js";
|
||||||
|
|
||||||
|
// Constants for replica set and test configuration
|
||||||
|
const oplogSizeMB = 1; // Small oplog size in MB
|
||||||
|
const longString = "a".repeat(450 * 1024); // Large document size (~500KB)
|
||||||
|
let nextId = 0; // Tracks the next `_id` for inserts
|
||||||
|
|
||||||
|
// Initialize a single-node replica set with a small oplog size
|
||||||
|
const rst = new ReplSetTest({
|
||||||
|
oplogSize: oplogSizeMB,
|
||||||
|
nodes: 1,
|
||||||
|
// Set the syncdelay to 1s to speed up checkpointing.
|
||||||
|
nodeOptions: {
|
||||||
|
syncdelay: 1,
|
||||||
|
setParameter: {
|
||||||
|
oplogSamplingAsyncEnabled: true,
|
||||||
|
logComponentVerbosity: tojson({storage: 1}),
|
||||||
|
minOplogTruncationPoints: 2,
|
||||||
|
"failpoint.hangOplogCapMaintainerThread": tojson({mode: "alwaysOn"}),
|
||||||
|
internalQueryExecYieldPeriodMS: 86400000, // Disable yielding
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
rst.startSet();
|
||||||
|
rst.initiate();
|
||||||
|
|
||||||
|
// Insert initial documents
|
||||||
|
jsTestLog("Inserting initial set of documents into the collection.");
|
||||||
|
for (let i = 0; i < 10; i++) {
|
||||||
|
rst.getPrimary().getDB("test").getCollection("markers").insert({_id: nextId++});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop and restart the replica set
|
||||||
|
rst.stopSet(null, true);
|
||||||
|
jsTestLog("Replica set stopped for restart.");
|
||||||
|
clearRawMongoProgramOutput();
|
||||||
|
|
||||||
|
rst.startSet(null, true); // Restart replica set
|
||||||
|
const restartedPrimary = rst.getPrimary();
|
||||||
|
const restartedPrimaryOplog = restartedPrimary.getDB("local").getCollection("oplog.rs");
|
||||||
|
jsTestLog("Replica set restarted.");
|
||||||
|
|
||||||
|
// // Verify that the oplog cap maintainer thread is paused.
|
||||||
|
assert.commandWorked(
|
||||||
|
restartedPrimary.adminCommand({
|
||||||
|
waitForFailPoint: "hangOplogCapMaintainerThread",
|
||||||
|
timesEntered: 1,
|
||||||
|
maxTimeMS: kDefaultWaitForFailPointTimeout,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Test inserts while truncate marker creation process is paused
|
||||||
|
jsTestLog("Testing oplog truncation logic with new inserts...");
|
||||||
|
const coll = restartedPrimary.getDB("test").markers;
|
||||||
|
const largeDocIDs = [nextId++, nextId++];
|
||||||
|
|
||||||
|
// Insert large documents
|
||||||
|
const firstInsertTimestamp =
|
||||||
|
assert
|
||||||
|
.commandWorked(
|
||||||
|
coll.runCommand("insert", {documents: [{_id: largeDocIDs[0], longString: longString}]}),
|
||||||
|
)
|
||||||
|
.operationTime;
|
||||||
|
jsTestLog("First insert timestamp: " + tojson(firstInsertTimestamp));
|
||||||
|
|
||||||
|
const secondInsertTimestamp =
|
||||||
|
assert
|
||||||
|
.commandWorked(
|
||||||
|
coll.runCommand("insert", {documents: [{_id: largeDocIDs[1], longString: longString}]}),
|
||||||
|
)
|
||||||
|
.operationTime;
|
||||||
|
jsTestLog("Second insert timestamp: " + tojson(secondInsertTimestamp));
|
||||||
|
|
||||||
|
// Take a checkpoint
|
||||||
|
restartedPrimary.getDB("admin").runCommand({fsync: 1});
|
||||||
|
|
||||||
|
// Verify truncate marker creation resumes post-startup
|
||||||
|
checkLog.containsJson(restartedPrimary, 8423403); // Log ID for startup finished
|
||||||
|
|
||||||
|
// Fetch server status and verify truncation metrics
|
||||||
|
let serverStatus = restartedPrimary.getDB("admin").runCommand({serverStatus: 1});
|
||||||
|
const truncationCount = serverStatus.oplogTruncation.truncateCount;
|
||||||
|
|
||||||
|
// Resume oplog truncate marker creation
|
||||||
|
jsTestLog("Resuming oplog truncate marker creation.");
|
||||||
|
assert.commandWorked(restartedPrimary.adminCommand(
|
||||||
|
{configureFailPoint: "hangOplogCapMaintainerThread", mode: "off"}));
|
||||||
|
|
||||||
|
// Verify truncate markers are created and logged
|
||||||
|
checkLog.containsJson(restartedPrimary, 22382); // Log ID: Oplog truncate markers calculated
|
||||||
|
|
||||||
|
// Insert additional records to trigger truncation
|
||||||
|
for (let i = 0; i < 50; i++) {
|
||||||
|
coll.insert({_id: nextId++, longString: longString});
|
||||||
|
}
|
||||||
|
restartedPrimary.getDB("admin").runCommand({fsync: 1});
|
||||||
|
|
||||||
|
// Wait for truncation to occur
|
||||||
|
assert.soon(() => {
|
||||||
|
serverStatus = restartedPrimary.getDB("admin").runCommand({serverStatus: 1});
|
||||||
|
return serverStatus.oplogTruncation.truncateCount > truncationCount;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Verify large documents were truncated from the oplog
|
||||||
|
const cursor = restartedPrimaryOplog.find({ns: "test.markers"});
|
||||||
|
while (cursor.hasNext()) {
|
||||||
|
const entry = cursor.next();
|
||||||
|
jsTestLog("Checking " + tojson(entry));
|
||||||
|
largeDocIDs.forEach((id) => {
|
||||||
|
assert.neq(id, entry.o["_id"], "Unexpected _id entry in oplog.");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
jsTestLog("Test complete. Stopping replica set.");
|
||||||
|
rst.stopSet();
|
||||||
|
|
@ -145,7 +145,8 @@ void ChangeCollectionTruncateMarkers::expirePartialMarker(OperationContext* opCt
|
||||||
const auto& doc = obj->data.toBson();
|
const auto& doc = obj->data.toBson();
|
||||||
auto wallTime = doc[repl::OplogEntry::kWallClockTimeFieldName].Date();
|
auto wallTime = doc[repl::OplogEntry::kWallClockTimeFieldName].Date();
|
||||||
|
|
||||||
updateCurrentMarkerAfterInsertOnCommit(opCtx, bytesNotTruncated, obj->id, wallTime, 1);
|
updateCurrentMarkerAfterInsertOnCommit(
|
||||||
|
opCtx, bytesNotTruncated, obj->id, wallTime, 1, /*gOplogSamplingAsyncEnabled=*/false);
|
||||||
|
|
||||||
auto bytesDeleted = oldestMarker->bytes - bytesNotTruncated;
|
auto bytesDeleted = oldestMarker->bytes - bytesNotTruncated;
|
||||||
auto docsDeleted = oldestMarker->records - 1;
|
auto docsDeleted = oldestMarker->records - 1;
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,8 @@ public:
|
||||||
// Performs post initialisation work. The constructor doesn't specify the highest element seen,
|
// Performs post initialisation work. The constructor doesn't specify the highest element seen,
|
||||||
// so we must update it after initialisation.
|
// so we must update it after initialisation.
|
||||||
void performPostInitialisation(const RecordId& highestRecordId, Date_t highestWallTime) {
|
void performPostInitialisation(const RecordId& highestRecordId, Date_t highestWallTime) {
|
||||||
updateCurrentMarker(0, highestRecordId, highestWallTime, 0);
|
updateCurrentMarker(
|
||||||
|
0, highestRecordId, highestWallTime, 0, /*gOplogSamplingAsyncEnabled=*/false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
|
||||||
|
|
@ -381,7 +381,8 @@ public:
|
||||||
tenantWriteStats.bytesInserted,
|
tenantWriteStats.bytesInserted,
|
||||||
tenantWriteStats.maxRecordIdSeen,
|
tenantWriteStats.maxRecordIdSeen,
|
||||||
tenantWriteStats.maxWallTimeSeen,
|
tenantWriteStats.maxWallTimeSeen,
|
||||||
tenantWriteStats.docsInserted);
|
tenantWriteStats.docsInserted,
|
||||||
|
/*gOplogSamplingAsyncEnabled=*/false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -224,7 +224,8 @@ void PreImagesTruncateMarkersPerNsUUID::updateMarkers(int64_t numBytes,
|
||||||
RecordId recordId,
|
RecordId recordId,
|
||||||
Date_t wallTime,
|
Date_t wallTime,
|
||||||
int64_t numRecords) {
|
int64_t numRecords) {
|
||||||
updateCurrentMarker(numBytes, recordId, wallTime, numRecords);
|
updateCurrentMarker(
|
||||||
|
numBytes, recordId, wallTime, numRecords, /*gOplogSamplingAsyncEnabled=*/false);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool PreImagesTruncateMarkersPerNsUUID::_hasExcessMarkers(OperationContext* opCtx) const {
|
bool PreImagesTruncateMarkersPerNsUUID::_hasExcessMarkers(OperationContext* opCtx) const {
|
||||||
|
|
|
||||||
|
|
@ -469,7 +469,6 @@ mongo_cc_unit_test(
|
||||||
"//src/mongo/db/storage:flow_control_test.cpp",
|
"//src/mongo/db/storage:flow_control_test.cpp",
|
||||||
"//src/mongo/db/storage:index_entry_comparison_test.cpp",
|
"//src/mongo/db/storage:index_entry_comparison_test.cpp",
|
||||||
"//src/mongo/db/storage:key_string_test.cpp",
|
"//src/mongo/db/storage:key_string_test.cpp",
|
||||||
"//src/mongo/db/storage:oplog_async_sampling_test.cpp",
|
|
||||||
"//src/mongo/db/storage:storage_engine_lock_file_test.cpp",
|
"//src/mongo/db/storage:storage_engine_lock_file_test.cpp",
|
||||||
"//src/mongo/db/storage:storage_engine_metadata_test.cpp",
|
"//src/mongo/db/storage:storage_engine_metadata_test.cpp",
|
||||||
"//src/mongo/db/storage:storage_repair_observer_test.cpp",
|
"//src/mongo/db/storage:storage_repair_observer_test.cpp",
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,8 @@ CollectionTruncateMarkers::Marker& CollectionTruncateMarkers::createNewMarker(
|
||||||
}
|
}
|
||||||
|
|
||||||
void CollectionTruncateMarkers::createNewMarkerIfNeeded(const RecordId& lastRecord,
|
void CollectionTruncateMarkers::createNewMarkerIfNeeded(const RecordId& lastRecord,
|
||||||
Date_t wallTime) {
|
Date_t wallTime,
|
||||||
|
bool oplogSamplingAsyncEnabled) {
|
||||||
auto logFailedLockAcquisition = [&](const std::string& lock) {
|
auto logFailedLockAcquisition = [&](const std::string& lock) {
|
||||||
LOGV2_DEBUG(7393214,
|
LOGV2_DEBUG(7393214,
|
||||||
2,
|
2,
|
||||||
|
|
@ -120,9 +121,7 @@ void CollectionTruncateMarkers::createNewMarkerIfNeeded(const RecordId& lastReco
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (feature_flags::gOplogSamplingAsyncEnabled.isEnabled(
|
if (oplogSamplingAsyncEnabled && !_initialSamplingFinished) {
|
||||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot()) &&
|
|
||||||
!_initialSamplingFinished) {
|
|
||||||
// Must have finished creating initial markers first.
|
// Must have finished creating initial markers first.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -156,13 +155,15 @@ void CollectionTruncateMarkers::updateCurrentMarkerAfterInsertOnCommit(
|
||||||
int64_t bytesInserted,
|
int64_t bytesInserted,
|
||||||
const RecordId& highestInsertedRecordId,
|
const RecordId& highestInsertedRecordId,
|
||||||
Date_t wallTime,
|
Date_t wallTime,
|
||||||
int64_t countInserted) {
|
int64_t countInserted,
|
||||||
|
bool oplogSamplingAsyncEnabled) {
|
||||||
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
|
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
|
||||||
[collectionMarkers = shared_from_this(),
|
[collectionMarkers = shared_from_this(),
|
||||||
bytesInserted,
|
bytesInserted,
|
||||||
recordId = highestInsertedRecordId,
|
recordId = highestInsertedRecordId,
|
||||||
wallTime,
|
wallTime,
|
||||||
countInserted](OperationContext* opCtx, auto) {
|
countInserted,
|
||||||
|
oplogSamplingAsyncEnabled](OperationContext* opCtx, auto) {
|
||||||
invariant(bytesInserted >= 0);
|
invariant(bytesInserted >= 0);
|
||||||
invariant(recordId.isValid());
|
invariant(recordId.isValid());
|
||||||
|
|
||||||
|
|
@ -173,7 +174,8 @@ void CollectionTruncateMarkers::updateCurrentMarkerAfterInsertOnCommit(
|
||||||
// When other transactions commit concurrently, an uninitialized wallTime may delay
|
// When other transactions commit concurrently, an uninitialized wallTime may delay
|
||||||
// the creation of a new marker. This delay is limited to the number of concurrently
|
// the creation of a new marker. This delay is limited to the number of concurrently
|
||||||
// running transactions, so the size difference should be inconsequential.
|
// running transactions, so the size difference should be inconsequential.
|
||||||
collectionMarkers->createNewMarkerIfNeeded(recordId, wallTime);
|
collectionMarkers->createNewMarkerIfNeeded(
|
||||||
|
recordId, wallTime, oplogSamplingAsyncEnabled);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -494,7 +496,8 @@ void CollectionTruncateMarkersWithPartialExpiration::updateCurrentMarkerAfterIns
|
||||||
int64_t bytesInserted,
|
int64_t bytesInserted,
|
||||||
const RecordId& highestInsertedRecordId,
|
const RecordId& highestInsertedRecordId,
|
||||||
Date_t wallTime,
|
Date_t wallTime,
|
||||||
int64_t countInserted) {
|
int64_t countInserted,
|
||||||
|
bool oplogSamplingAsyncEnabled) {
|
||||||
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
|
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
|
||||||
[collectionMarkers =
|
[collectionMarkers =
|
||||||
std::static_pointer_cast<CollectionTruncateMarkersWithPartialExpiration>(
|
std::static_pointer_cast<CollectionTruncateMarkersWithPartialExpiration>(
|
||||||
|
|
@ -502,11 +505,12 @@ void CollectionTruncateMarkersWithPartialExpiration::updateCurrentMarkerAfterIns
|
||||||
bytesInserted,
|
bytesInserted,
|
||||||
recordId = highestInsertedRecordId,
|
recordId = highestInsertedRecordId,
|
||||||
wallTime,
|
wallTime,
|
||||||
countInserted](OperationContext* opCtx, auto) {
|
countInserted,
|
||||||
|
oplogSamplingAsyncEnabled](OperationContext* opCtx, auto) {
|
||||||
invariant(bytesInserted >= 0);
|
invariant(bytesInserted >= 0);
|
||||||
invariant(recordId.isValid());
|
invariant(recordId.isValid());
|
||||||
collectionMarkers->updateCurrentMarker(
|
collectionMarkers->updateCurrentMarker(
|
||||||
bytesInserted, recordId, wallTime, countInserted);
|
bytesInserted, recordId, wallTime, countInserted, oplogSamplingAsyncEnabled);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -568,7 +572,8 @@ void CollectionTruncateMarkersWithPartialExpiration::updateCurrentMarker(
|
||||||
int64_t bytesAdded,
|
int64_t bytesAdded,
|
||||||
const RecordId& highestRecordId,
|
const RecordId& highestRecordId,
|
||||||
Date_t highestWallTime,
|
Date_t highestWallTime,
|
||||||
int64_t numRecordsAdded) {
|
int64_t numRecordsAdded,
|
||||||
|
bool oplogSamplingAsyncEnabled) {
|
||||||
// By putting the highest marker modification first we can guarantee than in the
|
// By putting the highest marker modification first we can guarantee than in the
|
||||||
// event of a race condition between expiring a partial marker the metrics increase
|
// event of a race condition between expiring a partial marker the metrics increase
|
||||||
// will happen after the marker has been created. This guarantees that the metrics
|
// will happen after the marker has been created. This guarantees that the metrics
|
||||||
|
|
@ -579,7 +584,7 @@ void CollectionTruncateMarkersWithPartialExpiration::updateCurrentMarker(
|
||||||
int64_t newCurrentBytes = _currentBytes.addAndFetch(bytesAdded);
|
int64_t newCurrentBytes = _currentBytes.addAndFetch(bytesAdded);
|
||||||
if (highestWallTime != Date_t() && highestRecordId.isValid() &&
|
if (highestWallTime != Date_t() && highestRecordId.isValid() &&
|
||||||
newCurrentBytes >= _minBytesPerMarker.load()) {
|
newCurrentBytes >= _minBytesPerMarker.load()) {
|
||||||
createNewMarkerIfNeeded(highestRecordId, highestWallTime);
|
createNewMarkerIfNeeded(highestRecordId, highestWallTime, oplogSamplingAsyncEnabled);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -127,14 +127,17 @@ public:
|
||||||
|
|
||||||
void popOldestMarker();
|
void popOldestMarker();
|
||||||
|
|
||||||
void createNewMarkerIfNeeded(const RecordId& lastRecord, Date_t wallTime);
|
void createNewMarkerIfNeeded(const RecordId& lastRecord,
|
||||||
|
Date_t wallTime,
|
||||||
|
bool oplogSamplingAsyncEnabled);
|
||||||
|
|
||||||
// Updates the current marker with the inserted value if the operation commits the WUOW.
|
// Updates the current marker with the inserted value if the operation commits the WUOW.
|
||||||
virtual void updateCurrentMarkerAfterInsertOnCommit(OperationContext* opCtx,
|
virtual void updateCurrentMarkerAfterInsertOnCommit(OperationContext* opCtx,
|
||||||
int64_t bytesInserted,
|
int64_t bytesInserted,
|
||||||
const RecordId& highestInsertedRecordId,
|
const RecordId& highestInsertedRecordId,
|
||||||
Date_t wallTime,
|
Date_t wallTime,
|
||||||
int64_t countInserted);
|
int64_t countInserted,
|
||||||
|
bool oplogSamplingAsyncEnabled);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits for expired markers. See _hasExcessMarkers().
|
* Waits for expired markers. See _hasExcessMarkers().
|
||||||
|
|
@ -386,7 +389,8 @@ public:
|
||||||
int64_t bytesInserted,
|
int64_t bytesInserted,
|
||||||
const RecordId& highestInsertedRecordId,
|
const RecordId& highestInsertedRecordId,
|
||||||
Date_t wallTime,
|
Date_t wallTime,
|
||||||
int64_t countInserted) final;
|
int64_t countInserted,
|
||||||
|
bool oplogSamplingAsyncEnabled) final;
|
||||||
|
|
||||||
std::pair<const RecordId&, const Date_t&> getPartialMarker_forTest() const {
|
std::pair<const RecordId&, const Date_t&> getPartialMarker_forTest() const {
|
||||||
return {_lastHighestRecordId, _lastHighestWallTime};
|
return {_lastHighestRecordId, _lastHighestWallTime};
|
||||||
|
|
@ -422,7 +426,8 @@ protected:
|
||||||
void updateCurrentMarker(int64_t bytesAdded,
|
void updateCurrentMarker(int64_t bytesAdded,
|
||||||
const RecordId& highestRecordId,
|
const RecordId& highestRecordId,
|
||||||
Date_t highestWallTime,
|
Date_t highestWallTime,
|
||||||
int64_t numRecordsAdded);
|
int64_t numRecordsAdded,
|
||||||
|
bool oplogSamplingAsyncEnabled);
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -78,8 +78,12 @@ public:
|
||||||
opCtx, insertedData.data(), insertedData.length(), ts);
|
opCtx, insertedData.data(), insertedData.length(), ts);
|
||||||
ASSERT_OK(recordIdStatus);
|
ASSERT_OK(recordIdStatus);
|
||||||
auto recordId = recordIdStatus.getValue();
|
auto recordId = recordIdStatus.getValue();
|
||||||
testMarkers.updateCurrentMarkerAfterInsertOnCommit(
|
testMarkers.updateCurrentMarkerAfterInsertOnCommit(opCtx,
|
||||||
opCtx, insertedData.length(), recordId, now, 1);
|
insertedData.length(),
|
||||||
|
recordId,
|
||||||
|
now,
|
||||||
|
1,
|
||||||
|
/*oplogSamplingAsyncEnabled*/ false);
|
||||||
records.push_back(RecordIdAndWall{std::move(recordId), std::move(now)});
|
records.push_back(RecordIdAndWall{std::move(recordId), std::move(now)});
|
||||||
}
|
}
|
||||||
wuow.commit();
|
wuow.commit();
|
||||||
|
|
@ -110,7 +114,7 @@ public:
|
||||||
ASSERT_EQ(recordIdStatus.getValue(), recordId);
|
ASSERT_EQ(recordIdStatus.getValue(), recordId);
|
||||||
auto now = Date_t::fromMillisSinceEpoch(timestampToUse.asInt64());
|
auto now = Date_t::fromMillisSinceEpoch(timestampToUse.asInt64());
|
||||||
testMarkers.updateCurrentMarkerAfterInsertOnCommit(
|
testMarkers.updateCurrentMarkerAfterInsertOnCommit(
|
||||||
opCtx, insertedData.length(), recordId, now, 1);
|
opCtx, insertedData.length(), recordId, now, 1, /*oplogSamplingAsyncEnabled*/ false);
|
||||||
wuow.commit();
|
wuow.commit();
|
||||||
return recordId;
|
return recordId;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,246 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2025-present MongoDB, Inc.
|
|
||||||
*
|
|
||||||
* This program is free software: you can redistribute it and/or modify
|
|
||||||
* it under the terms of the Server Side Public License, version 1,
|
|
||||||
* as published by MongoDB, Inc.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* Server Side Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the Server Side Public License
|
|
||||||
* along with this program. If not, see
|
|
||||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
|
||||||
*
|
|
||||||
* As a special exception, the copyright holders give permission to link the
|
|
||||||
* code of portions of this program with the OpenSSL library under certain
|
|
||||||
* conditions as described in each individual source file and distribute
|
|
||||||
* linked combinations including the program with the OpenSSL library. You
|
|
||||||
* must comply with the Server Side Public License in all respects for
|
|
||||||
* all of the code used other than as permitted herein. If you modify file(s)
|
|
||||||
* with this exception, you may extend this exception to your version of the
|
|
||||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
|
||||||
* delete this exception statement from your version. If you delete this
|
|
||||||
* exception statement from all source files in the program, then also delete
|
|
||||||
* it in the license file.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <boost/container/vector.hpp>
|
|
||||||
#include <boost/move/utility_core.hpp>
|
|
||||||
#include <boost/none.hpp>
|
|
||||||
#include <boost/optional.hpp>
|
|
||||||
#include <fmt/format.h>
|
|
||||||
|
|
||||||
#include "mongo/bson/bsonobjbuilder.h"
|
|
||||||
#include "mongo/db/catalog_raii.h"
|
|
||||||
#include "mongo/db/client.h"
|
|
||||||
#include "mongo/db/index/index_access_method.h"
|
|
||||||
#include "mongo/db/namespace_string.h"
|
|
||||||
#include "mongo/db/operation_context.h"
|
|
||||||
#include "mongo/db/repl/oplog.h"
|
|
||||||
#include "mongo/db/repl/oplog_applier_impl_test_fixture.h"
|
|
||||||
#include "mongo/db/repl/replication_coordinator.h"
|
|
||||||
#include "mongo/db/repl/replication_coordinator_mock.h"
|
|
||||||
#include "mongo/db/repl/storage_interface_impl.h"
|
|
||||||
#include "mongo/db/service_context_d_test_fixture.h"
|
|
||||||
#include "mongo/db/storage/record_store.h"
|
|
||||||
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
|
|
||||||
#include "mongo/db/storage/write_unit_of_work.h"
|
|
||||||
#include "mongo/unittest/unittest.h"
|
|
||||||
#include "mongo/util/duration.h"
|
|
||||||
|
|
||||||
namespace mongo {
|
|
||||||
namespace repl {
|
|
||||||
const auto& oplogNs = NamespaceString::kRsOplogNamespace;
|
|
||||||
|
|
||||||
class AsyncOplogTruncationTest : public ServiceContextMongoDTest {
|
|
||||||
protected:
|
|
||||||
OperationContext* getOperationContext() {
|
|
||||||
return _opCtx.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
StorageInterface& getStorage() {
|
|
||||||
return _storage;
|
|
||||||
}
|
|
||||||
|
|
||||||
BSONObj makeBSONObjWithSize(unsigned int seconds, unsigned int t, int size, char fill = 'x') {
|
|
||||||
Timestamp opTime{seconds, t};
|
|
||||||
Date_t wallTime = Date_t::fromMillisSinceEpoch(t);
|
|
||||||
BSONObj objTemplate = BSON("ts" << opTime << "wall" << wallTime << "str"
|
|
||||||
<< "");
|
|
||||||
ASSERT_LTE(objTemplate.objsize(), size);
|
|
||||||
std::string str(size - objTemplate.objsize(), fill);
|
|
||||||
|
|
||||||
BSONObj obj = BSON("ts" << opTime << "wall" << wallTime << "str" << str);
|
|
||||||
ASSERT_EQ(size, obj.objsize());
|
|
||||||
|
|
||||||
return obj;
|
|
||||||
}
|
|
||||||
|
|
||||||
BSONObj makeBSONObjWithSize(unsigned int t, int size, char fill = 'x') {
|
|
||||||
return makeBSONObjWithSize(1, t, size, fill);
|
|
||||||
}
|
|
||||||
|
|
||||||
BSONObj insertOplog(unsigned int seconds, unsigned int t, int size) {
|
|
||||||
auto obj = makeBSONObjWithSize(seconds, t, size);
|
|
||||||
AutoGetOplogFastPath oplogWrite(_opCtx.get(), OplogAccessMode::kWrite);
|
|
||||||
const auto& oplog = oplogWrite.getCollection();
|
|
||||||
std::vector<Record> records{{RecordId(), RecordData(obj.objdata(), obj.objsize())}};
|
|
||||||
std::vector<Timestamp> timestamps{Timestamp()};
|
|
||||||
WriteUnitOfWork wuow(_opCtx.get());
|
|
||||||
ASSERT_OK(internal::insertDocumentsForOplog(_opCtx.get(), oplog, &records, timestamps));
|
|
||||||
wuow.commit();
|
|
||||||
return obj;
|
|
||||||
}
|
|
||||||
|
|
||||||
BSONObj insertOplog(unsigned int t, int size) {
|
|
||||||
return insertOplog(1, t, size);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
void setUp() override {
|
|
||||||
ServiceContextMongoDTest::setUp();
|
|
||||||
_opCtx = cc().makeOperationContext();
|
|
||||||
auto service = getServiceContext();
|
|
||||||
auto replCoord = std::make_unique<ReplicationCoordinatorMock>(service);
|
|
||||||
ReplicationCoordinator::set(service, std::move(replCoord));
|
|
||||||
// Turn on async mode
|
|
||||||
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
|
||||||
"oplogSamplingAsyncEnabled", true);
|
|
||||||
repl::createOplog(_opCtx.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
void tearDown() override {
|
|
||||||
_opCtx.reset(nullptr);
|
|
||||||
ServiceContextMongoDTest::tearDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
ServiceContext::UniqueOperationContext _opCtx;
|
|
||||||
StorageInterfaceImpl _storage;
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO SERVER-101672 Re-enable these tests.
|
|
||||||
|
|
||||||
// In async mode, sampleAndUpdate is called seperately from createOplogTruncateMarkers, and
|
|
||||||
// creates the initial set of markers.
|
|
||||||
TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeSampleAndUpdate) {
|
|
||||||
// // Turn off async mode
|
|
||||||
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
|
||||||
// "oplogSamplingAsyncEnabled", true);
|
|
||||||
// auto opCtx = getOperationContext();
|
|
||||||
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
|
||||||
|
|
||||||
// // Populate oplog to force marker creation to occur
|
|
||||||
// int realNumRecords = 4;
|
|
||||||
// int realSizePerRecord = 1024 * 1024;
|
|
||||||
// for (int i = 1; i <= realNumRecords; i++) {
|
|
||||||
// insertOplog(i, realSizePerRecord);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
|
||||||
// ASSERT(oplogTruncateMarkers);
|
|
||||||
|
|
||||||
// ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
|
|
||||||
|
|
||||||
// // Continue finishing the initial scan / sample
|
|
||||||
// oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
|
|
||||||
|
|
||||||
// // Confirm that some truncate markers were generated.
|
|
||||||
// ASSERT_LT(0U, oplogTruncateMarkers->numMarkers_forTest());
|
|
||||||
// } // namespace repl
|
|
||||||
|
|
||||||
// In async mode, during startup but before sampling finishes, creation method is InProgress.
|
|
||||||
// This should then resolve to either the Scanning or Sampling method once initial marker
|
|
||||||
// creation has finished TEST_F(AsyncOplogTruncationTest,
|
|
||||||
// OplogTruncateMarkers_AsynchronousModeInProgressState) { Turn off async mode
|
|
||||||
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
|
||||||
// "oplogSamplingAsyncEnabled", true);
|
|
||||||
// auto opCtx = getOperationContext();
|
|
||||||
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
|
||||||
|
|
||||||
// // Populate oplog to so that initial marker creation method is not EmptyCollection
|
|
||||||
// insertOplog(1, 100);
|
|
||||||
|
|
||||||
// // Note if in async mode, at this point we have not yet sampled.
|
|
||||||
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
|
||||||
// ASSERT(oplogTruncateMarkers);
|
|
||||||
|
|
||||||
// // Confirm that we are in InProgress state since sampling/scanning has not begun.
|
|
||||||
// ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::InProgress,
|
|
||||||
// oplogTruncateMarkers->getMarkersCreationMethod());
|
|
||||||
|
|
||||||
// // Continue finishing the initial scan / sample
|
|
||||||
// oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
|
|
||||||
|
|
||||||
// // Check that the InProgress state has now been resolved.
|
|
||||||
// ASSERT(oplogTruncateMarkers->getMarkersCreationMethod() ==
|
|
||||||
// CollectionTruncateMarkers::MarkersCreationMethod::Scanning);
|
|
||||||
}
|
|
||||||
|
|
||||||
// In async mode, we are still able to sample when expected, and some markers can be created.
|
|
||||||
TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeSampling) {
|
|
||||||
// Turn off async mode
|
|
||||||
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
|
||||||
// "oplogSamplingAsyncEnabled", true);
|
|
||||||
// auto opCtx = getOperationContext();
|
|
||||||
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
|
||||||
// auto wtRS = static_cast<WiredTigerRecordStore::Oplog*>(rs);
|
|
||||||
|
|
||||||
// {
|
|
||||||
// // Before initializing the RecordStore, populate with a few records.
|
|
||||||
// insertOplog(1, 100);
|
|
||||||
// insertOplog(2, 100);
|
|
||||||
// insertOplog(3, 100);
|
|
||||||
// insertOplog(4, 100);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// {
|
|
||||||
// // Force initialize the oplog truncate markers to use sampling by providing very
|
|
||||||
// large,
|
|
||||||
// // inaccurate sizes. This should cause us to over sample the records in the oplog.
|
|
||||||
// ASSERT_OK(wtRS->updateSize(1024 * 1024 * 1024));
|
|
||||||
// wtRS->setNumRecords(1024 * 1024);
|
|
||||||
// wtRS->setDataSize(1024 * 1024 * 1024);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// LocalOplogInfo::get(opCtx)->setRecordStore(opCtx, rs);
|
|
||||||
// // Note if in async mode, at this point we have not yet sampled.
|
|
||||||
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
|
||||||
// ASSERT(oplogTruncateMarkers);
|
|
||||||
|
|
||||||
// // Continue finishing the initial scan / sample
|
|
||||||
// oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
|
|
||||||
// ASSERT(oplogTruncateMarkers);
|
|
||||||
|
|
||||||
// // Confirm that we can in fact sample
|
|
||||||
// ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::Sampling,
|
|
||||||
// oplogTruncateMarkers->getMarkersCreationMethod());
|
|
||||||
// // Confirm that some truncate markers were generated.
|
|
||||||
// ASSERT_GTE(oplogTruncateMarkers->getCreationProcessingTime().count(), 0);
|
|
||||||
// auto truncateMarkersBefore = oplogTruncateMarkers->numMarkers_forTest();
|
|
||||||
// ASSERT_GT(truncateMarkersBefore, 0U);
|
|
||||||
// ASSERT_GT(oplogTruncateMarkers->currentBytes_forTest(), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// In async mode, markers are not created during createOplogTruncateMarkers (which instead
|
|
||||||
// returns empty OplogTruncateMarkers object)
|
|
||||||
TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeCreateOplogTruncateMarkers) {
|
|
||||||
// Turn off async mode
|
|
||||||
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
|
|
||||||
// "oplogSamplingAsyncEnabled", true);
|
|
||||||
// auto opCtx = getOperationContext();
|
|
||||||
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
|
|
||||||
|
|
||||||
// // Note if in async mode, at this point we have not yet sampled.
|
|
||||||
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
|
||||||
// ASSERT(oplogTruncateMarkers);
|
|
||||||
|
|
||||||
// ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
|
|
||||||
// ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
|
|
||||||
// ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace repl
|
|
||||||
} // namespace mongo
|
|
||||||
|
|
@ -283,9 +283,4 @@ feature_flags:
|
||||||
description: "Enable support for decompressing BSON columns using the block based Path API"
|
description: "Enable support for decompressing BSON columns using the block based Path API"
|
||||||
cpp_varname: feature_flags::gBlockBasedDecodingPathAPI
|
cpp_varname: feature_flags::gBlockBasedDecodingPathAPI
|
||||||
default: true
|
default: true
|
||||||
shouldBeFCVGated: false
|
|
||||||
featureFlagOplogSamplingAsyncEnabled:
|
|
||||||
description: "Enable oplog sampling to run asynchronously to startup on the OplogCapMaintainerThread"
|
|
||||||
cpp_varname: feature_flags::gOplogSamplingAsyncEnabled
|
|
||||||
default: false
|
|
||||||
shouldBeFCVGated: false
|
shouldBeFCVGated: false
|
||||||
|
|
@ -301,14 +301,10 @@ std::shared_ptr<WiredTigerRecordStore::OplogTruncateMarkers>
|
||||||
WiredTigerRecordStore::OplogTruncateMarkers::createOplogTruncateMarkers(OperationContext* opCtx,
|
WiredTigerRecordStore::OplogTruncateMarkers::createOplogTruncateMarkers(OperationContext* opCtx,
|
||||||
WiredTigerRecordStore* rs,
|
WiredTigerRecordStore* rs,
|
||||||
const NamespaceString& ns) {
|
const NamespaceString& ns) {
|
||||||
bool samplingAsynchronously =
|
|
||||||
feature_flags::gOplogSamplingAsyncEnabled.isEnabled(
|
|
||||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot()) &&
|
|
||||||
gOplogSamplingAsyncEnabled;
|
|
||||||
LOGV2(10621000,
|
LOGV2(10621000,
|
||||||
"Creating oplog markers",
|
"Creating oplog markers",
|
||||||
"sampling asynchronously"_attr = samplingAsynchronously);
|
"sampling asynchronously"_attr = gOplogSamplingAsyncEnabled);
|
||||||
if (!samplingAsynchronously) {
|
if (!gOplogSamplingAsyncEnabled) {
|
||||||
return sampleAndUpdate(opCtx, rs, ns);
|
return sampleAndUpdate(opCtx, rs, ns);
|
||||||
}
|
}
|
||||||
return createEmptyOplogTruncateMarkers(rs);
|
return createEmptyOplogTruncateMarkers(rs);
|
||||||
|
|
@ -1284,8 +1280,12 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
|
||||||
return ele.Date();
|
return ele.Date();
|
||||||
}
|
}
|
||||||
}();
|
}();
|
||||||
_oplogTruncateMarkers->updateCurrentMarkerAfterInsertOnCommit(
|
_oplogTruncateMarkers->updateCurrentMarkerAfterInsertOnCommit(opCtx,
|
||||||
opCtx, totalLength, records[nRecords - 1].id, wall, nRecords);
|
totalLength,
|
||||||
|
records[nRecords - 1].id,
|
||||||
|
wall,
|
||||||
|
nRecords,
|
||||||
|
gOplogSamplingAsyncEnabled);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue