From ac1aac1b63befcc7d3932b019d847c2fd982eeb8 Mon Sep 17 00:00:00 2001 From: Jan Date: Thu, 9 Oct 2025 12:20:17 +0200 Subject: [PATCH] SERVER-112036 Fix overly permissive check for $v:1 oplog entries in change stream event generation (#42262) GitOrigin-RevId: c13e4b5c3e5063706b874c4beda43b804c14b16a --- docs/change_streams.md | 13 ++++-- .../change_stream_event_transform.cpp | 33 ++++++++------- .../document_source_change_stream_test.cpp | 41 +++++++++++-------- 3 files changed, 49 insertions(+), 38 deletions(-) diff --git a/docs/change_streams.md b/docs/change_streams.md index 649193fdc15..8e451f86b2b 100644 --- a/docs/change_streams.md +++ b/docs/change_streams.md @@ -77,7 +77,8 @@ _mongosh_, the "jstest shell" (_mongo_) and many drivers provide simpler "watch" ### Opening a Collection-Level Change Stream -To open a collection-level change stream on a specific collection (e.g., `testDB.testCollection`): +To open a collection-level change stream on a specific collection (e.g., `testDB.testCollection`), +the following _mongosh_ command can be used: ```js db.getSiblingDB("testDB").runCommand({ @@ -93,7 +94,8 @@ db.getSiblingDB("testDB").runCommand({ ### Opening a Database-Level Change Stream -To open a database-level change stream on a specific database (e.g., `testDB`), use +To open a database-level change stream on a specific database (e.g., `testDB`), use the following +command in _mongosh_: ```js db.getSiblingDB("testDB").runCommand({ @@ -115,7 +117,8 @@ The internal namespace that is used by database-level change streams is `(UpdateOplogEntryVersion::kUpdateNodeV1) - << " (kUpdateNodeV1) or " - << static_cast(UpdateOplogEntryVersion::kDeltaV2) - << " (kDeltaV2), but got oplog version $v: " - << oplogVersion.toString(), - !id.missing() || oplogVersion.missing() || - oplogVersion.getInt() == - static_cast(UpdateOplogEntryVersion::kUpdateNodeV1) || - oplogVersion.getInt() == - static_cast(UpdateOplogEntryVersion::kDeltaV2)); - + // If there is no '_id' field and the '$v' is not 2, it is an old-style modifier + // update. This is unsupported. // It is important to check for '_id' field first, because a replacement style update // can still have a '$v' field in the object. - if (id.missing() && !oplogVersion.missing() && - oplogVersion.getInt() == static_cast(UpdateOplogEntryVersion::kDeltaV2)) { + const bool isUpdateEntry = id.missing(); + uassert( + 6741200, + str::stream() << "Expected _id field, or $v field missing, or $v equal to " + << static_cast(UpdateOplogEntryVersion::kDeltaV2) + << " (kDeltaV2), but got oplog version $v: " + << oplogVersion.toString(), + !isUpdateEntry || + (!oplogVersion.missing() && oplogVersion.getType() == BSONType::numberInt && + oplogVersion.getInt() == static_cast(UpdateOplogEntryVersion::kDeltaV2))); + + if (isUpdateEntry) { // Parsing the delta oplog entry. operationType = DocumentSourceChangeStream::kUpdateOpType; Value diffObj = input[repl::OplogEntry::kObjectFieldName] @@ -395,6 +393,7 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum } } } else { + // Replace. operationType = DocumentSourceChangeStream::kReplaceOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; } @@ -630,7 +629,7 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum boost::optional> ChangeStreamDefaultEventTransformation::handleSupportedEvent(const Document& o2Field) const { for (auto&& supportedEvent : _supportedEvents) { - if (auto lookup = o2Field[supportedEvent]; !o2Field[supportedEvent].missing()) { + if (auto lookup = o2Field[supportedEvent]; !lookup.missing()) { // Known event. return std::make_pair(supportedEvent, Value{copyDocExceptFields(o2Field, {supportedEvent})}); diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index b8399132338..39a16c8217b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -4507,9 +4507,9 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy } } -TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissing) { - // A missing $v field in the update oplog entry implies $v:1, which is no longer supported. - BSONObj o = BSON("$set" << BSON("y" << 1)); +TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissingWithId) { + // An _id field in the update oplog entry implies a replace. + BSONObj o = BSON("_id" << 1 << "$set" << BSON("y" << 1)); BSONObj o2 = BSON("_id" << 1 << "x" << 2); auto replace = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2); @@ -4519,7 +4519,7 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissing) { {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kFullDocumentField, D{{"$set", D{{"y", 1}}}}}, + {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"$set", D{{"y", 1}}}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, }; @@ -4527,6 +4527,23 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissing) { checkTransformation(replace, expectedReplace); } +TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissingWithoutId) { + // A missing _id field and a missing $v field in the update oplog entry implies a $v:1 update + // oplog entry, which is no longer supported. + BSONObj o = BSON("$set" << BSON("y" << 1)); + BSONObj o2 = BSON("_id" << 1 << "x" << 2); + auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2); + checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200); +} + +TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsInvalidV) { + // A $v field in the update oplog entry without an _id field requires '$v:2'. + BSONObj o = BSON("$v" << "ABC" << "diff" << BSON("i" << BSON("v" << 5))); + BSONObj o2 = BSON("_id" << 1 << "x" << 2); + auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2); + checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200); +} + TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsDeltaUpdateNonV2NotSupported) { BSONObj diff = BSON("u" << BSON("y" << 1)); BSONObj o = BSON("diff" << diff << "$v" << 3); @@ -4554,20 +4571,8 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsDeltaUpdate) { TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsModifierUpdate) { BSONObj o = BSON("x" << 2 << "y" << 1); BSONObj o2 = BSON("_id" << 1); - auto replace = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2); - - Document expectedReplace{ - {DSChangeStream::kIdField, - makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, - {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kFullDocumentField, D{{"x", 2}, {"y", 1}}}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}}, - {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, - }; - - checkTransformation(replace, expectedReplace); + auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2); + checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200); } TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) {