mirror of https://github.com/mongodb/mongo
SERVER-109210 Fix viewless oplog entry writing and add feature flag check (#41780)
GitOrigin-RevId: 61d4531c5174f802c475f9459716ce3e734eee91
This commit is contained in:
parent
fe4f6c35cb
commit
353fb32b75
|
|
@ -0,0 +1,145 @@
|
|||
/**
|
||||
* Tests expected operations on time-series collections produce oplog entries with
|
||||
* isTimeseries set to true.
|
||||
*
|
||||
* @tags: [
|
||||
* requires_fcv_83,
|
||||
* assumes_against_mongod_not_mongos,
|
||||
* requires_capped,
|
||||
* requires_getmore,
|
||||
* requires_replication,
|
||||
* requires_timeseries,
|
||||
* featureFlagMarkTimeseriesEventsInOplog,
|
||||
* incompatible_with_snapshot_reads,
|
||||
* no_selinux,
|
||||
* ]
|
||||
|
||||
*/
|
||||
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
|
||||
|
||||
const testDB = db.getSiblingDB(jsTestName());
|
||||
const collName = "ts";
|
||||
|
||||
// Skips for burnin
|
||||
if (FixtureHelpers.isMongos(db)) {
|
||||
jsTestLog("Skipping test because it is not compatible with mongos");
|
||||
quit();
|
||||
}
|
||||
if (!FixtureHelpers.isReplSet(db)) {
|
||||
jsTestLog("Skipping test because it is not compatible with standalone");
|
||||
quit();
|
||||
}
|
||||
|
||||
// Test creation oplog entry
|
||||
assert.commandWorked(testDB.createCollection(collName, {timeseries: {timeField: "t", metaField: "m"}}));
|
||||
let opEntries = db.getSiblingDB("local").runCommand({
|
||||
find: "oplog.rs",
|
||||
filter: {
|
||||
op: "c",
|
||||
ns: `${jsTestName()}.$cmd`,
|
||||
"o.create": {$exists: true},
|
||||
isTimeseries: true,
|
||||
},
|
||||
limit: 1,
|
||||
singleBatch: true,
|
||||
readConcern: {level: "local"},
|
||||
}).cursor.firstBatch;
|
||||
assert.eq(opEntries.length, 1, tojson(opEntries));
|
||||
|
||||
// Test oplog entries from index creation
|
||||
const testcoll = testDB.getCollection(collName);
|
||||
assert.commandWorked(testcoll.insert({t: ISODate(), m: 1, a: 1}));
|
||||
assert.commandWorked(testcoll.createIndex({m: 1}));
|
||||
opEntries = db.getSiblingDB("local").runCommand({
|
||||
find: "oplog.rs",
|
||||
filter: {
|
||||
op: "c",
|
||||
ns: `${jsTestName()}.$cmd`,
|
||||
"o.createIndexes": {$exists: true},
|
||||
"o.key.meta": 1,
|
||||
isTimeseries: true,
|
||||
},
|
||||
limit: 1,
|
||||
singleBatch: true,
|
||||
readConcern: {level: "local"},
|
||||
}).cursor.firstBatch;
|
||||
assert.eq(opEntries.length, 1, tojson(opEntries));
|
||||
opEntries = db.getSiblingDB("local").runCommand({
|
||||
find: "oplog.rs",
|
||||
filter: {
|
||||
op: "c",
|
||||
ns: `${jsTestName()}.$cmd`,
|
||||
"o.startIndexBuild": {$exists: true},
|
||||
"o.indexes.key.meta": 1,
|
||||
isTimeseries: true,
|
||||
},
|
||||
limit: 1,
|
||||
singleBatch: true,
|
||||
readConcern: {level: "local"},
|
||||
}).cursor.firstBatch;
|
||||
assert.eq(opEntries.length, 1, tojson(opEntries));
|
||||
opEntries = db.getSiblingDB("local").runCommand({
|
||||
find: "oplog.rs",
|
||||
filter: {
|
||||
op: "c",
|
||||
ns: `${jsTestName()}.$cmd`,
|
||||
"o.commitIndexBuild": {$exists: true},
|
||||
"o.indexes.key.meta": 1,
|
||||
isTimeseries: true,
|
||||
},
|
||||
limit: 1,
|
||||
singleBatch: true,
|
||||
readConcern: {level: "local"},
|
||||
}).cursor.firstBatch;
|
||||
assert.eq(opEntries.length, 1, tojson(opEntries));
|
||||
|
||||
// Test modifications
|
||||
assert.commandWorked(
|
||||
testDB.runCommand({
|
||||
collMod: collName,
|
||||
timeseriesBucketsMayHaveMixedSchemaData: true,
|
||||
}),
|
||||
);
|
||||
opEntries = db.getSiblingDB("local").runCommand({
|
||||
find: "oplog.rs",
|
||||
filter: {
|
||||
op: "c",
|
||||
ns: `${jsTestName()}.$cmd`,
|
||||
"o.collMod": {$exists: true},
|
||||
isTimeseries: true,
|
||||
},
|
||||
limit: 1,
|
||||
singleBatch: true,
|
||||
readConcern: {level: "local"},
|
||||
}).cursor.firstBatch;
|
||||
assert.eq(opEntries.length, 1, tojson(opEntries));
|
||||
|
||||
// Test teardown
|
||||
assert.commandWorked(testDB.runCommand({dropIndexes: collName, index: "m_1_t_1"}));
|
||||
opEntries = db.getSiblingDB("local").runCommand({
|
||||
find: "oplog.rs",
|
||||
filter: {
|
||||
op: "c",
|
||||
ns: `${jsTestName()}.$cmd`,
|
||||
"o.dropIndexes": {$exists: true},
|
||||
isTimeseries: true,
|
||||
},
|
||||
limit: 1,
|
||||
singleBatch: true,
|
||||
readConcern: {level: "local"},
|
||||
}).cursor.firstBatch;
|
||||
assert.eq(opEntries.length, 1, tojson(opEntries));
|
||||
assert.commandWorked(testDB.runCommand({drop: collName}));
|
||||
opEntries = db.getSiblingDB("local").runCommand({
|
||||
find: "oplog.rs",
|
||||
filter: {
|
||||
op: "c",
|
||||
ns: `${jsTestName()}.$cmd`,
|
||||
"o.drop": {$exists: true},
|
||||
isTimeseries: true,
|
||||
},
|
||||
limit: 1,
|
||||
singleBatch: true,
|
||||
readConcern: {level: "local"},
|
||||
}).cursor.firstBatch;
|
||||
assert.eq(opEntries.length, 1, tojson(opEntries));
|
||||
|
|
@ -2,7 +2,8 @@
|
|||
* Tests that it's not possible to shard or move a malformed timeseries collection where both
|
||||
* 'coll' and 'system.buckets.coll' exist as collections (see SERVER-90862).
|
||||
*
|
||||
* @tags: [featureFlagCreateViewlessTimeseriesCollections_incompatible]
|
||||
* @tags: [featureFlagCreateViewlessTimeseriesCollections_incompatible,
|
||||
* featureFlagMarkTimeseriesEventsInOplog_incompatible]
|
||||
*/
|
||||
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
* # This inconsistency that can only happen with legacy timeseries collections,
|
||||
* # which allowed a system.buckets namespace to exist without timeseries options.
|
||||
* featureFlagCreateViewlessTimeseriesCollections_incompatible,
|
||||
* featureFlagMarkTimeseriesEventsInOplog_incompatible,
|
||||
* ]
|
||||
*/
|
||||
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@
|
|||
* # in which both the main namespace and the system.buckets namespace need to be created.
|
||||
* # Viewless timeseries collections have a single namespace and reuse the regular cleanup logic.
|
||||
* featureFlagCreateViewlessTimeseriesCollections_incompatible,
|
||||
* featureFlagMarkTimeseriesEventsInOplog_incompatible,
|
||||
* ]
|
||||
*/
|
||||
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@
|
|||
* # Tests a state of partial creation (timeseries buckets exists, timeseries view does not)
|
||||
* # which can't happen with viewless timeseries collections, since they are created atomically.
|
||||
* featureFlagCreateViewlessTimeseriesCollections_incompatible,
|
||||
* featureFlagMarkTimeseriesEventsInOplog_incompatible,
|
||||
* ]
|
||||
*/
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
|
|
|
|||
|
|
@ -288,7 +288,8 @@ void removeIndexBuildEntryAfterCommitOrAbort(OperationContext* opCtx,
|
|||
void onCommitIndexBuild(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
std::shared_ptr<ReplIndexBuildState> replState,
|
||||
std::vector<boost::optional<MultikeyPaths>> multikeys) {
|
||||
std::vector<boost::optional<MultikeyPaths>> multikeys,
|
||||
bool isTimeseries) {
|
||||
const auto& buildUUID = replState->buildUUID;
|
||||
|
||||
replState->commit(opCtx);
|
||||
|
|
@ -333,7 +334,7 @@ void onCommitIndexBuild(OperationContext* opCtx,
|
|||
}
|
||||
|
||||
opObserver->onCommitIndexBuild(
|
||||
opCtx, nss, collUUID, buildUUID, indexSpecs, multikeyObjs, fromMigrate);
|
||||
opCtx, nss, collUUID, buildUUID, indexSpecs, multikeyObjs, fromMigrate, isTimeseries);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -342,7 +343,8 @@ void onCommitIndexBuild(OperationContext* opCtx,
|
|||
*/
|
||||
void onAbortIndexBuild(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
ReplIndexBuildState& replState) {
|
||||
ReplIndexBuildState& replState,
|
||||
bool isTimeseries) {
|
||||
if (IndexBuildProtocol::kTwoPhase != replState.protocol) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -359,7 +361,8 @@ void onAbortIndexBuild(OperationContext* opCtx,
|
|||
replState.buildUUID,
|
||||
toIndexSpecs(replState.getIndexes()),
|
||||
replState.getAbortStatus(),
|
||||
fromMigrate);
|
||||
fromMigrate,
|
||||
isTimeseries);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1595,7 +1598,7 @@ void IndexBuildsCoordinator::_completeAbort(OperationContext* opCtx,
|
|||
str::stream() << "singlePhase: "
|
||||
<< (IndexBuildProtocol::kSinglePhase == replState->protocol));
|
||||
auto onCleanUpFn = [&] {
|
||||
onAbortIndexBuild(opCtx, coll->ns(), *replState);
|
||||
onAbortIndexBuild(opCtx, coll->ns(), *replState, coll->isTimeseriesCollection());
|
||||
};
|
||||
_indexBuildsManager.abortIndexBuild(opCtx, coll, replState->buildUUID, onCleanUpFn);
|
||||
removeIndexBuildEntryAfterCommitOrAbort(
|
||||
|
|
@ -2329,8 +2332,12 @@ void IndexBuildsCoordinator::_createIndex(OperationContext* opCtx,
|
|||
auto onCreateEachFn = [&](const BSONObj& spec,
|
||||
const IndexCatalogEntry& entry,
|
||||
boost::optional<MultikeyPaths>&& multikey) {
|
||||
opObserver->onCreateIndex(
|
||||
opCtx, collection->ns(), collection->uuid(), indexBuildInfo, fromMigrate);
|
||||
opObserver->onCreateIndex(opCtx,
|
||||
collection->ns(),
|
||||
collection->uuid(),
|
||||
indexBuildInfo,
|
||||
fromMigrate,
|
||||
collection->isTimeseriesCollection());
|
||||
};
|
||||
uassertStatusOK(_indexBuildsManager.commitIndexBuild(
|
||||
opCtx, collection, nss, buildUUID, onCreateEachFn, MultiIndexBlock::kNoopOnCommitFn));
|
||||
|
|
@ -2377,7 +2384,12 @@ void IndexBuildsCoordinator::createIndexesOnEmptyCollection(OperationContext* op
|
|||
// timestamp.
|
||||
indexBuildInfo.spec = uassertStatusOK(indexCatalog->prepareSpecForCreate(
|
||||
opCtx, collectionPtr, indexBuildInfo.spec, boost::none));
|
||||
opObserver->onCreateIndex(opCtx, nss, collectionUUID, indexBuildInfo, fromMigrate);
|
||||
opObserver->onCreateIndex(opCtx,
|
||||
nss,
|
||||
collectionUUID,
|
||||
indexBuildInfo,
|
||||
fromMigrate,
|
||||
collection->isTimeseriesCollection());
|
||||
uassertStatusOK(
|
||||
IndexBuildBlock::buildEmptyIndex(opCtx, writeableCollection, indexBuildInfo));
|
||||
}
|
||||
|
|
@ -2657,7 +2669,8 @@ IndexBuildsCoordinator::PostSetupAction IndexBuildsCoordinator::_setUpIndexBuild
|
|||
replState->collectionUUID,
|
||||
replState->buildUUID,
|
||||
replState->getIndexes(),
|
||||
false /* fromMigrate */);
|
||||
false /* fromMigrate */,
|
||||
collection->isTimeseriesCollection());
|
||||
};
|
||||
} else {
|
||||
onInitFn = MultiIndexBlock::makeTimestampedIndexOnInitFn(opCtx, collection.get());
|
||||
|
|
@ -3558,7 +3571,11 @@ IndexBuildsCoordinator::CommitResult IndexBuildsCoordinator::_insertKeysFromSide
|
|||
// If two phase index builds is enabled, index build will be coordinated using
|
||||
// startIndexBuild and commitIndexBuild oplog entries.
|
||||
auto onCommitFn = [&] {
|
||||
onCommitIndexBuild(opCtx, collection->ns(), replState, multikeys);
|
||||
onCommitIndexBuild(opCtx,
|
||||
collection->ns(),
|
||||
replState,
|
||||
multikeys,
|
||||
collection->isTimeseriesCollection());
|
||||
};
|
||||
|
||||
int i = 0;
|
||||
|
|
@ -3588,8 +3605,12 @@ IndexBuildsCoordinator::CommitResult IndexBuildsCoordinator::_insertKeysFromSide
|
|||
auto opObserver = opCtx->getServiceContext()->getOpObserver();
|
||||
IndexBuildInfo indexBuildInfo(spec, std::string{entry.getIdent()});
|
||||
auto fromMigrate = false;
|
||||
opObserver->onCreateIndex(
|
||||
opCtx, collection->ns(), replState->collectionUUID, indexBuildInfo, fromMigrate);
|
||||
opObserver->onCreateIndex(opCtx,
|
||||
collection->ns(),
|
||||
replState->collectionUUID,
|
||||
indexBuildInfo,
|
||||
fromMigrate,
|
||||
collection->isTimeseriesCollection());
|
||||
};
|
||||
|
||||
// Commit index build.
|
||||
|
|
|
|||
|
|
@ -1077,8 +1077,13 @@ Status _collModInternal(OperationContext* opCtx,
|
|||
// Only observe non-view collMods, as view operations are observed as operations on the
|
||||
// system.views collection.
|
||||
auto* const opObserver = opCtx->getServiceContext()->getOpObserver();
|
||||
opObserver->onCollMod(
|
||||
opCtx, nss, writableColl->uuid(), oplogEntryObj, oldCollOptions, indexCollModInfo);
|
||||
opObserver->onCollMod(opCtx,
|
||||
nss,
|
||||
writableColl->uuid(),
|
||||
oplogEntryObj,
|
||||
oldCollOptions,
|
||||
indexCollModInfo,
|
||||
writableColl->isTimeseriesCollection());
|
||||
|
||||
wunit.commit();
|
||||
return Status::OK();
|
||||
|
|
|
|||
|
|
@ -233,7 +233,8 @@ void DatabaseHolderImpl::dropDb(OperationContext* opCtx, Database* db) {
|
|||
coll->ns(),
|
||||
coll->uuid(),
|
||||
coll->numRecords(opCtx),
|
||||
/*markFromMigrate=*/false);
|
||||
/*markFromMigrate=*/false,
|
||||
coll->isTimeseriesCollection());
|
||||
}
|
||||
|
||||
Top::getDecoration(opCtx).collectionDropped(coll->ns());
|
||||
|
|
|
|||
|
|
@ -503,7 +503,8 @@ Status DatabaseImpl::dropCollectionEvenIfSystem(OperationContext* opCtx,
|
|||
auto isOplogDisabledForNamespace = replCoord->isOplogDisabledFor(opCtx, nss);
|
||||
if (dropOpTime.isNull() && isOplogDisabledForNamespace) {
|
||||
_dropCollectionIndexes(opCtx, nss, collection.getWritableCollection(opCtx));
|
||||
opObserver->onDropCollection(opCtx, nss, uuid, numRecords, markFromMigrate);
|
||||
opObserver->onDropCollection(
|
||||
opCtx, nss, uuid, numRecords, markFromMigrate, collection->isTimeseriesCollection());
|
||||
return _finishDropCollection(opCtx, nss, collection.getWritableCollection(opCtx));
|
||||
}
|
||||
|
||||
|
|
@ -524,12 +525,14 @@ Status DatabaseImpl::dropCollectionEvenIfSystem(OperationContext* opCtx,
|
|||
"commitTimestamp"_attr = commitTimestamp);
|
||||
if (dropOpTime.isNull()) {
|
||||
// Log oplog entry for collection drop and remove the UUID.
|
||||
dropOpTime = opObserver->onDropCollection(opCtx, nss, uuid, numRecords, markFromMigrate);
|
||||
dropOpTime = opObserver->onDropCollection(
|
||||
opCtx, nss, uuid, numRecords, markFromMigrate, collection->isTimeseriesCollection());
|
||||
invariant(!dropOpTime.isNull());
|
||||
} else {
|
||||
// If we are provided with a valid 'dropOpTime', it means we are dropping this
|
||||
// collection in the context of applying an oplog entry on a secondary.
|
||||
auto opTime = opObserver->onDropCollection(opCtx, nss, uuid, numRecords, markFromMigrate);
|
||||
auto opTime = opObserver->onDropCollection(
|
||||
opCtx, nss, uuid, numRecords, markFromMigrate, collection->isTimeseriesCollection());
|
||||
// OpObserver::onDropCollection should not be writing to the oplog on the secondary.
|
||||
invariant(opTime.isNull(),
|
||||
str::stream() << "OpTime is not null. OpTime: " << opTime.toString());
|
||||
|
|
@ -883,13 +886,15 @@ Collection* DatabaseImpl::_createCollection(
|
|||
|
||||
hangBeforeLoggingCreateCollection.pauseWhileSet();
|
||||
|
||||
opCtx->getServiceContext()->getOpObserver()->onCreateCollection(opCtx,
|
||||
nss,
|
||||
optionsWithUUID,
|
||||
fullIdIndexSpec,
|
||||
createOplogSlot,
|
||||
catalogIdentifierForColl,
|
||||
fromMigrate);
|
||||
opCtx->getServiceContext()->getOpObserver()->onCreateCollection(
|
||||
opCtx,
|
||||
nss,
|
||||
optionsWithUUID,
|
||||
fullIdIndexSpec,
|
||||
createOplogSlot,
|
||||
catalogIdentifierForColl,
|
||||
fromMigrate,
|
||||
collection->isTimeseriesCollection());
|
||||
|
||||
// It is necessary to create the system index *after* running the onCreateCollection so that
|
||||
// the storage timestamp for the index creation is after the storage timestamp for the
|
||||
|
|
|
|||
|
|
@ -273,7 +273,8 @@ Status dropIndexByDescriptor(OperationContext* opCtx,
|
|||
collection->ns(),
|
||||
collection->uuid(),
|
||||
entry->descriptor()->indexName(),
|
||||
entry->descriptor()->infoObj());
|
||||
entry->descriptor()->infoObj(),
|
||||
collection->isTimeseriesCollection());
|
||||
|
||||
auto s = indexCatalog->dropIndexEntry(opCtx, collection, entry);
|
||||
if (!s.isOK()) {
|
||||
|
|
|
|||
|
|
@ -67,6 +67,7 @@
|
|||
#include "mongo/db/repl/replication_coordinator.h"
|
||||
#include "mongo/db/repl/truncate_range_oplog_entry_gen.h"
|
||||
#include "mongo/db/rss/replicated_storage_service.h"
|
||||
#include "mongo/db/server_feature_flags_gen.h"
|
||||
#include "mongo/db/server_options.h"
|
||||
#include "mongo/db/session/logical_session_id_helpers.h"
|
||||
#include "mongo/db/session/session_txn_record_gen.h"
|
||||
|
|
@ -344,7 +345,10 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
|
|||
oplogEntry.setTid(nss.tenantId());
|
||||
oplogEntry.setNss(nss.getCommandNS());
|
||||
oplogEntry.setUuid(uuid);
|
||||
if (isTimeseries) {
|
||||
if (isTimeseries &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntry.setIsTimeseries();
|
||||
}
|
||||
oplogEntry.setObject(builder.obj());
|
||||
|
|
@ -411,7 +415,10 @@ void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx,
|
|||
|
||||
MutableOplogEntry oplogEntry;
|
||||
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
|
||||
if (isTimeseries) {
|
||||
if (isTimeseries &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntry.setIsTimeseries();
|
||||
}
|
||||
oplogEntry.setTid(nss.tenantId());
|
||||
|
|
@ -487,7 +494,10 @@ void OpObserverImpl::onCommitIndexBuild(OperationContext* opCtx,
|
|||
MutableOplogEntry oplogEntry;
|
||||
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
|
||||
|
||||
if (isTimeseries) {
|
||||
if (isTimeseries &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntry.setIsTimeseries();
|
||||
}
|
||||
oplogEntry.setTid(nss.tenantId());
|
||||
|
|
@ -531,7 +541,10 @@ void OpObserverImpl::onAbortIndexBuild(OperationContext* opCtx,
|
|||
MutableOplogEntry oplogEntry;
|
||||
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
|
||||
|
||||
if (isTimeseries) {
|
||||
if (isTimeseries &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntry.setIsTimeseries();
|
||||
}
|
||||
oplogEntry.setTid(nss.tenantId());
|
||||
|
|
@ -756,7 +769,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
|
|||
// Ensure well-formed embedded ReplOperation for logging.
|
||||
// This means setting optype, nss, and object at the minimum.
|
||||
MutableOplogEntry oplogEntryTemplate;
|
||||
if (coll->isNewTimeseriesWithoutView()) {
|
||||
if (coll->isNewTimeseriesWithoutView() &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntryTemplate.setIsTimeseries();
|
||||
}
|
||||
oplogEntryTemplate.setOpType(repl::OpTypeEnum::kInsert);
|
||||
|
|
@ -1348,7 +1364,10 @@ void OpObserverImpl::onCreateCollection(
|
|||
|
||||
MutableOplogEntry oplogEntry;
|
||||
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
|
||||
if (isTimeseries) {
|
||||
if (isTimeseries &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntry.setIsTimeseries();
|
||||
}
|
||||
oplogEntry.setTid(collectionName.tenantId());
|
||||
|
|
@ -1438,7 +1457,10 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx,
|
|||
|
||||
oplogEntry.setTid(nss.tenantId());
|
||||
oplogEntry.setNss(nss.getCommandNS());
|
||||
if (isTimeseries) {
|
||||
if (isTimeseries &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntry.setIsTimeseries();
|
||||
}
|
||||
oplogEntry.setUuid(uuid);
|
||||
|
|
@ -1516,7 +1538,10 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
|
|||
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
|
||||
|
||||
oplogEntry.setTid(collectionName.tenantId());
|
||||
if (isTimeseries) {
|
||||
if (isTimeseries &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntry.setIsTimeseries();
|
||||
}
|
||||
oplogEntry.setNss(collectionName.getCommandNS());
|
||||
|
|
@ -1552,7 +1577,10 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx,
|
|||
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
|
||||
|
||||
oplogEntry.setTid(nss.tenantId());
|
||||
if (isTimeseries) {
|
||||
if (isTimeseries &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntry.setIsTimeseries();
|
||||
}
|
||||
oplogEntry.setNss(nss.getCommandNS());
|
||||
|
|
@ -1600,7 +1628,10 @@ repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx,
|
|||
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
|
||||
|
||||
oplogEntry.setTid(fromCollection.tenantId());
|
||||
if (isTimeseries) {
|
||||
if (isTimeseries &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntry.setIsTimeseries();
|
||||
}
|
||||
oplogEntry.setNss(fromCollection.getCommandNS());
|
||||
|
|
@ -1674,7 +1705,10 @@ void OpObserverImpl::onImportCollection(OperationContext* opCtx,
|
|||
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
|
||||
|
||||
oplogEntry.setTid(nss.tenantId());
|
||||
if (isTimeseries) {
|
||||
if (isTimeseries &&
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
oplogEntry.setIsTimeseries();
|
||||
}
|
||||
oplogEntry.setNss(nss.getCommandNS());
|
||||
|
|
|
|||
|
|
@ -86,6 +86,7 @@
|
|||
#include "mongo/db/repl/storage_interface.h"
|
||||
#include "mongo/db/repl/storage_interface_impl.h"
|
||||
#include "mongo/db/repl/truncate_range_oplog_entry_gen.h"
|
||||
#include "mongo/db/server_feature_flags_gen.h"
|
||||
#include "mongo/db/server_options.h"
|
||||
#include "mongo/db/service_context_d_test_fixture.h"
|
||||
#include "mongo/db/session/session.h"
|
||||
|
|
@ -538,7 +539,10 @@ protected:
|
|||
|
||||
// Tests the oplog entry generated from 'onCreateCollection'. Simulates the creation of a
|
||||
// non-clustered collection.
|
||||
void testOnCreateCollBasic(bool catalogReplicationEnabled, bool viewless = false) {
|
||||
void testOnCreateCollBasic(bool catalogReplicationEnabled,
|
||||
bool viewless = false,
|
||||
bool viewlessParam = false) {
|
||||
gFeatureFlagMarkTimeseriesEventsInOplog.setForServerParameter(viewlessParam);
|
||||
RAIIServerParameterControllerForTest replicateLocalCatalogInfoController(
|
||||
"featureFlagReplicateLocalCatalogIdentifiers", catalogReplicationEnabled);
|
||||
|
||||
|
|
@ -573,7 +577,7 @@ protected:
|
|||
validateReplicatedCatalogIdentifier(
|
||||
opCtx, oplogEntry, catalogIdentifier, catalogReplicationEnabled);
|
||||
bool isTimeseries = oplogEntryBSON.getBoolField("isTimeseries");
|
||||
ASSERT_EQ(viewless, isTimeseries);
|
||||
ASSERT_EQ(viewless && viewlessParam, isTimeseries);
|
||||
}
|
||||
|
||||
void testOnCreateCollClustered(bool catalogReplicationEnabled) {
|
||||
|
|
@ -668,11 +672,25 @@ TEST_F(OpObserverOnCreateCollectionTest, BasicReplicatedCatalogIdentifiersEnable
|
|||
TEST_F(OpObserverOnCreateCollectionTest, BasicReplicatedCatalogIdentifiersDisabled) {
|
||||
testOnCreateCollBasic(false /* catalogReplicationEnabled */);
|
||||
}
|
||||
TEST_F(OpObserverOnCreateCollectionTest, BasicReplicatedCatalogIdentifiersEnabledWithTimeseries) {
|
||||
testOnCreateCollBasic(true /* catalogReplicationEnabled */, true /* viewless */);
|
||||
TEST_F(OpObserverOnCreateCollectionTest,
|
||||
BasicReplicatedCatalogIdentifiersEnabledWithTimeseriesParamOff) {
|
||||
testOnCreateCollBasic(
|
||||
true /* catalogReplicationEnabled */, true /* viewless */, false /* viewlessParam */);
|
||||
}
|
||||
TEST_F(OpObserverOnCreateCollectionTest, BasicReplicatedCatalogIdentifiersDisabledWithTimeseries) {
|
||||
testOnCreateCollBasic(false /* catalogReplicationEnabled */, true /* viewless */);
|
||||
TEST_F(OpObserverOnCreateCollectionTest,
|
||||
BasicReplicatedCatalogIdentifiersDisabledWithTimeseriesParamOff) {
|
||||
testOnCreateCollBasic(
|
||||
false /* catalogReplicationEnabled */, true /* viewless */, false /* viewlessParam */);
|
||||
}
|
||||
TEST_F(OpObserverOnCreateCollectionTest,
|
||||
BasicReplicatedCatalogIdentifiersEnabledWithTimeseriesParamOn) {
|
||||
testOnCreateCollBasic(
|
||||
true /* catalogReplicationEnabled */, true /* viewless */, true /* viewlessParam */);
|
||||
}
|
||||
TEST_F(OpObserverOnCreateCollectionTest,
|
||||
BasicReplicatedCatalogIdentifiersDisabledWithTimeseriesParamOn) {
|
||||
testOnCreateCollBasic(
|
||||
false /* catalogReplicationEnabled */, true /* viewless */, true /* viewlessParam */);
|
||||
}
|
||||
|
||||
TEST_F(OpObserverOnCreateCollectionTest, ClusteredReplicatedCatalogIdentifiersEnabled) {
|
||||
|
|
@ -874,6 +892,8 @@ TEST_F(OpObserverTest, AbortIndexBuildExpectedOplogEntry) {
|
|||
TEST_F(OpObserverTest, checkIsTimeseriesOnReplLogUpdate) {
|
||||
RAIIServerParameterControllerForTest viewlessController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
RAIIServerParameterControllerForTest viewlessController2(
|
||||
"featureFlagMarkTimeseriesEventsInOplog", true);
|
||||
|
||||
NamespaceString curNss = NamespaceString::createNamespaceString_forTest("test.tsColl");
|
||||
|
||||
|
|
@ -914,6 +934,8 @@ TEST_F(OpObserverTest, checkIsTimeseriesOnReplLogUpdate) {
|
|||
TEST_F(OpObserverTest, checkIsTimeseriesOnReplLogDelete) {
|
||||
RAIIServerParameterControllerForTest viewlessController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
RAIIServerParameterControllerForTest viewlessController2(
|
||||
"featureFlagMarkTimeseriesEventsInOplog", true);
|
||||
OpObserverImpl opObserver(std::make_unique<OperationLoggerImpl>());
|
||||
auto opCtx = cc().makeOperationContext();
|
||||
|
||||
|
|
@ -943,6 +965,8 @@ TEST_F(OpObserverTest, checkIsTimeseriesOnReplLogDelete) {
|
|||
TEST_F(OpObserverTest, checkIsTimeseriesOnInserts) {
|
||||
RAIIServerParameterControllerForTest viewlessController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
RAIIServerParameterControllerForTest viewlessController2(
|
||||
"featureFlagMarkTimeseriesEventsInOplog", true);
|
||||
|
||||
NamespaceString curNss = NamespaceString::createNamespaceString_forTest("test.tsColl");
|
||||
auto opCtx = cc().makeOperationContext();
|
||||
|
|
@ -1923,7 +1947,7 @@ protected:
|
|||
|
||||
TEST_F(OpObserverTransactionTest, checkIsTimeseriesOnMultiDocTransaction) {
|
||||
RAIIServerParameterControllerForTest viewlessController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
"featureFlagMarkTimeseriesEventsInOplog", true);
|
||||
|
||||
auto txnParticipant = TransactionParticipant::get(opCtx());
|
||||
txnParticipant.unstashTransactionResources(opCtx(), "delete");
|
||||
|
|
@ -3533,6 +3557,8 @@ TEST_F(BatchedWriteOutputsTest, TestApplyOpsInsertDeleteUpdate) {
|
|||
TEST_F(BatchedWriteOutputsTest, TestApplyOpsInsertDeleteUpdateOnViewlessTimeseries) {
|
||||
RAIIServerParameterControllerForTest viewlessController(
|
||||
"featureFlagCreateViewlessTimeseriesCollections", true);
|
||||
RAIIServerParameterControllerForTest viewlessController2(
|
||||
"featureFlagMarkTimeseriesEventsInOplog", true);
|
||||
NamespaceString curNss = NamespaceString::createNamespaceString_forTest("test.tsColl");
|
||||
|
||||
auto opCtxRaii = cc().makeOperationContext();
|
||||
|
|
|
|||
|
|
@ -142,7 +142,7 @@ public:
|
|||
bool isTimeseries) override {
|
||||
ReservedTimes times{opCtx};
|
||||
for (auto& o : _observers)
|
||||
o->onCreateIndex(opCtx, nss, uuid, indexBuildInfo, fromMigrate);
|
||||
o->onCreateIndex(opCtx, nss, uuid, indexBuildInfo, fromMigrate, isTimeseries);
|
||||
}
|
||||
|
||||
void onStartIndexBuild(OperationContext* opCtx,
|
||||
|
|
@ -154,7 +154,8 @@ public:
|
|||
bool isTimeseries) override {
|
||||
ReservedTimes times{opCtx};
|
||||
for (auto& o : _observers) {
|
||||
o->onStartIndexBuild(opCtx, nss, collUUID, indexBuildUUID, indexes, fromMigrate);
|
||||
o->onStartIndexBuild(
|
||||
opCtx, nss, collUUID, indexBuildUUID, indexes, fromMigrate, isTimeseries);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -177,7 +178,7 @@ public:
|
|||
ReservedTimes times{opCtx};
|
||||
for (auto& o : _observers) {
|
||||
o->onCommitIndexBuild(
|
||||
opCtx, nss, collUUID, indexBuildUUID, indexes, multikey, fromMigrate);
|
||||
opCtx, nss, collUUID, indexBuildUUID, indexes, multikey, fromMigrate, isTimeseries);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -191,7 +192,8 @@ public:
|
|||
bool isTimeseries) override {
|
||||
ReservedTimes times{opCtx};
|
||||
for (auto& o : _observers) {
|
||||
o->onAbortIndexBuild(opCtx, nss, collUUID, indexBuildUUID, indexes, cause, fromMigrate);
|
||||
o->onAbortIndexBuild(
|
||||
opCtx, nss, collUUID, indexBuildUUID, indexes, cause, fromMigrate, isTimeseries);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -356,7 +358,8 @@ public:
|
|||
idIndex,
|
||||
createOpTime,
|
||||
createCollCatalogIdentifier,
|
||||
fromMigrate);
|
||||
fromMigrate,
|
||||
isTimeseries);
|
||||
}
|
||||
|
||||
void onCollMod(OperationContext* const opCtx,
|
||||
|
|
@ -368,7 +371,7 @@ public:
|
|||
bool isTimeseries) override {
|
||||
ReservedTimes times{opCtx};
|
||||
for (auto& o : _observers)
|
||||
o->onCollMod(opCtx, nss, uuid, collModCmd, oldCollOptions, indexInfo);
|
||||
o->onCollMod(opCtx, nss, uuid, collModCmd, oldCollOptions, indexInfo, isTimeseries);
|
||||
}
|
||||
|
||||
void onDropDatabase(OperationContext* const opCtx,
|
||||
|
|
@ -388,7 +391,7 @@ public:
|
|||
ReservedTimes times{opCtx};
|
||||
for (auto& observer : this->_observers) {
|
||||
auto time = observer->onDropCollection(
|
||||
opCtx, collectionName, uuid, numRecords, markFromMigrate);
|
||||
opCtx, collectionName, uuid, numRecords, markFromMigrate, isTimeseries);
|
||||
invariant(time.isNull());
|
||||
}
|
||||
return _getOpTimeToReturn(times.get().reservedOpTimes);
|
||||
|
|
@ -402,7 +405,7 @@ public:
|
|||
bool isTimeseries) override {
|
||||
ReservedTimes times{opCtx};
|
||||
for (auto& o : _observers)
|
||||
o->onDropIndex(opCtx, nss, uuid, indexName, idxDescriptor);
|
||||
o->onDropIndex(opCtx, nss, uuid, indexName, idxDescriptor, isTimeseries);
|
||||
}
|
||||
|
||||
void onRenameCollection(OperationContext* const opCtx,
|
||||
|
|
@ -467,7 +470,8 @@ public:
|
|||
dropTargetUUID,
|
||||
numRecords,
|
||||
stayTemp,
|
||||
markFromMigrate);
|
||||
markFromMigrate,
|
||||
isTimeseries);
|
||||
invariant(time.isNull());
|
||||
}
|
||||
return _getOpTimeToReturn(times.get().reservedOpTimes);
|
||||
|
|
|
|||
|
|
@ -145,6 +145,14 @@ feature_flags:
|
|||
cpp_varname: gFeatureFlagCreateViewlessTimeseriesCollections
|
||||
default: false
|
||||
fcv_gated: true
|
||||
featureFlagMarkTimeseriesEventsInOplog:
|
||||
description: >-
|
||||
Enables marking oplog entries writing to viewless timeseries collections.
|
||||
Viewless timeseries collections have only one associated namespace,
|
||||
instead of a view namespace and a system.buckets namespace.
|
||||
cpp_varname: gFeatureFlagMarkTimeseriesEventsInOplog
|
||||
default: false
|
||||
fcv_gated: true
|
||||
featureFlagMongodProxyProtocolSupport:
|
||||
description: "Enables non-OCS proxy protocol connections on Mongos and Mongod"
|
||||
cpp_varname: gFeatureFlagMongodProxyProtocolSupport
|
||||
|
|
|
|||
Loading…
Reference in New Issue