SERVER-111406 Fix change stream results for top-level $v fields (#41830)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
GitOrigin-RevId: 06b4ff83b0e9ef0a0d79ec803396547d38065747
This commit is contained in:
Jan 2025-09-29 16:38:34 +02:00 committed by MongoDB Bot
parent fa84e1fd87
commit f8b9603ac1
4 changed files with 243 additions and 11 deletions

View File

@ -61,6 +61,8 @@ last-continuous:
ticket: SERVER-95726
- test_file: jstests/query_golden/distinct_index_eligibility_md.js
ticket: SERVER-111486
- test_file: jstests/change_streams/events_containing_version_literals.js
ticket: SERVER-111406
suites: null
last-lts:
all:
@ -624,4 +626,6 @@ last-lts:
ticket: SERVER-110058
- test_file: jstests/core/query/notablescan_sbe_consistency.js
ticket: SERVER-110058
- test_file: jstests/change_streams/events_containing_version_literals.js
ticket: SERVER-111406
suites: null

View File

@ -0,0 +1,157 @@
/**
* Tests that change stream events containg a '$v' field work as expected.
* @tags: [
* uses_change_streams,
* ]
*/
import {afterEach, beforeEach, describe, it} from "jstests/libs/mochalite.js";
import {assertDropAndRecreateCollection} from "jstests/libs/collection_drop_recreate.js";
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
describe("change streams correctly return documents containing $v attributes", () => {
const kCollName = jsTestName();
let cst;
let cursor;
beforeEach(() => {
assertDropAndRecreateCollection(db, kCollName);
cst = new ChangeStreamTest(db);
cursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {}}],
collection: db[kCollName],
});
// Insert 5 documents that will be used in the following update tests.
[1, 2, "1", "2", "test"].forEach((v, i) => {
assert.commandWorked(db[kCollName].insert({_id: i, $v: v}));
let expected = {
documentKey: {_id: i},
fullDocument: {_id: i, $v: v},
ns: {db: "test", coll: kCollName},
operationType: "insert",
};
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
});
});
afterEach(() => {
cst.cleanUp();
});
// Test update operations using $v field name, and check that they are either
// We update in different order here than update, so that we don't cause no-op updates.
it("tests that updates using $v inside an object literal fail", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(db[kCollName].update({_id: i}, {$v: v}), ErrorCodes.FailedToParse);
});
});
it("tests that updates using $v inside an object literal succeed with upsert when the source documents do not exist", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(
db[kCollName].update({_id: i + 10}, {$v: v}, {upsert: true}),
ErrorCodes.FailedToParse,
);
});
});
it("tests that updates using $v inside $set and an object literal fail", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(
db[kCollName].update({_id: i}, {$set: {$v: v}}),
ErrorCodes.DollarPrefixedFieldName,
);
});
});
it("tests that updates using $v inside $set and an object literal succeed with upsert when the source documents do not exist", () => {
// Target documents do not yet exist, so the upsert creates them using inserts.
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandWorked(db[kCollName].update({_id: i + 10}, {$set: {$v: v}}, {upsert: true}));
let expected = {
documentKey: {_id: i + 10},
fullDocument: {_id: i + 10, $v: v},
ns: {db: "test", coll: kCollName},
operationType: "insert",
};
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
});
});
it("tests that pipeline updates using $v inside $replaceWith and $literal succeed", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandWorked(db[kCollName].update({_id: i}, [{$replaceWith: {$literal: {_id: i, $v: v}}}]));
let expected = {
documentKey: {_id: i},
fullDocument: {_id: i, $v: v},
ns: {db: "test", coll: kCollName},
operationType: "replace",
};
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
});
});
it("tests that pipeline updates using $v inside using $replaceWith and an object literal fail", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(db[kCollName].update({_id: i}, [{$replaceWith: {_id: i, $v: v}}]), 16410);
});
});
it("tests that pipeline updates using $v inside using $replaceRoot and $literal succeed", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandWorked(
db[kCollName].update({_id: i}, [{$replaceRoot: {newRoot: {$literal: {_id: i, $v: v}}}}]),
);
let expected = {
documentKey: {_id: i},
fullDocument: {_id: i, $v: v},
ns: {db: "test", coll: kCollName},
operationType: "replace",
};
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
});
});
it("tests that pipeline updates using $v using $replaceRoot and an object literal fail", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(
db[kCollName].update({_id: i}, [{$replaceRoot: {newRoot: {_id: i, $v: v}}}]),
16410,
);
});
});
it("tests that update using $v inside $inc fails", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(db[kCollName].update({_id: i}, {$inc: {$v: 1}}), [
ErrorCodes.DollarPrefixedFieldName,
ErrorCodes.TypeMismatch,
]);
});
});
it("tests that updates using $v inside $addFields fails", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandFailedWithCode(db[kCollName].update({_id: i}, [{$addFields: {$v: i}}]), 16410);
});
});
it("tests that updates using $v inside $replaceWith and $setField and an object literal succeed", () => {
["test", 1, 2, "1", "2"].forEach((v, i) => {
assert.commandWorked(
db[kCollName].update({_id: i}, [
{$replaceWith: {$setField: {field: {$literal: "$v"}, input: "$$ROOT", value: v}}},
]),
);
let expected = {
documentKey: {_id: i},
fullDocument: {_id: i, $v: v},
ns: {db: "test", coll: kCollName},
operationType: "replace",
};
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
});
});
});

View File

@ -344,7 +344,28 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// indicates the delta oplog entry.
Value oplogVersion =
input[repl::OplogEntry::kObjectFieldName][kUpdateOplogEntryVersionFieldName];
if (!oplogVersion.missing() && oplogVersion.getInt() == 2) {
// Check that the oplog entry format is as expected:
// - if there is an '_id' field, it is a replace.
// - if there is no '_id' field and the '$v' is 2, it is a delta (diff) update.
// - if there is no '_id' field and the '$v' is not 2, it is a modifier update.
uassert(6741200,
str::stream() << "Expected _id field, or $v field missing, or $v equal to "
<< static_cast<int>(UpdateOplogEntryVersion::kUpdateNodeV1)
<< " (kUpdateNodeV1) or "
<< static_cast<int>(UpdateOplogEntryVersion::kDeltaV2)
<< " (kDeltaV2), but got oplog version $v: "
<< oplogVersion.toString(),
!id.missing() || oplogVersion.missing() ||
oplogVersion.getInt() ==
static_cast<int>(UpdateOplogEntryVersion::kUpdateNodeV1) ||
oplogVersion.getInt() ==
static_cast<int>(UpdateOplogEntryVersion::kDeltaV2));
// It is important to check for '_id' field first, because a replacement style update
// can still have a '$v' field in the object.
if (id.missing() && !oplogVersion.missing() &&
oplogVersion.getInt() == static_cast<int>(UpdateOplogEntryVersion::kDeltaV2)) {
// Parsing the delta oplog entry.
operationType = DocumentSourceChangeStream::kUpdateOpType;
Value diffObj = input[repl::OplogEntry::kObjectFieldName]
@ -376,11 +397,6 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
{"truncatedArrays", std::move(deltaDesc.truncatedArrays)}});
}
}
} else if (!oplogVersion.missing() || id.missing()) {
// This is not a replacement op, and we did not see a valid update version number.
uasserted(6741200,
str::stream() << "Unsupported or missing oplog version, $v: "
<< oplogVersion.toString());
} else {
operationType = DocumentSourceChangeStream::kReplaceOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];

View File

@ -4507,15 +4507,27 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy
}
}
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissingNotSupported) {
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissing) {
// A missing $v field in the update oplog entry implies $v:1, which is no longer supported.
BSONObj o = BSON("$set" << BSON("y" << 1));
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200);
auto replace = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
Document expectedReplace{
{DSChangeStream::kIdField,
makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"$set", D{{"y", 1}}}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
checkTransformation(replace, expectedReplace);
}
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsNonV2NotSupported) {
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsDeltaUpdateNonV2NotSupported) {
BSONObj diff = BSON("u" << BSON("y" << 1));
BSONObj o = BSON("diff" << diff << "$v" << 3);
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
@ -4523,7 +4535,7 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsNonV2NotSupported) {
checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200);
}
TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) {
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsDeltaUpdate) {
BSONObj diff = BSON("u" << BSON("y" << 1));
BSONObj o = BSON("diff" << diff << "$v" << 2);
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
@ -4539,6 +4551,25 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) {
checkTransformation(updateField, expectedUpdateField);
}
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsModifierUpdate) {
BSONObj o = BSON("x" << 2 << "y" << 1);
BSONObj o2 = BSON("_id" << 1);
auto replace = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
Document expectedReplace{
{DSChangeStream::kIdField,
makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"x", 2}, {"y", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
};
checkTransformation(replace, expectedReplace);
}
TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) {
BSONObj diff = BSON("d" << BSON("y" << false));
BSONObj o = BSON("diff" << diff << "$v" << 2);
@ -4584,6 +4615,30 @@ TEST_F(ChangeStreamStageDBTest, TransformReplace) {
checkTransformation(replace, expectedReplace);
}
TEST_F(ChangeStreamStageDBTest, TransformReplaceWithVField) {
BSONObj o = BSON("_id" << 1 << "x" << 2 << "y" << 1 << "$v" << 2);
BSONObj o2 = BSON("_id" << 1);
auto replace = makeOplogEntry(OpTypeEnum::kUpdate, // op type
nss, // namespace
o, // o
testUuid(), // uuid
boost::none, // fromMigrate
o2); // o2
// Replace
Document expectedReplace{
{DSChangeStream::kIdField,
makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}, {"$v", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
};
checkTransformation(replace, expectedReplace);
}
TEST_F(ChangeStreamStageDBTest, TransformDelete) {
BSONObj o = BSON("_id" << 1 << "x" << 2);
auto deleteEntry = makeOplogEntry(OpTypeEnum::kDelete, // op type