mirror of https://github.com/mongodb/mongo
SERVER-112036 Fix overly permissive check for $v:1 oplog entries in change stream event generation (#42262)
GitOrigin-RevId: c13e4b5c3e5063706b874c4beda43b804c14b16a
This commit is contained in:
parent
a4a462e03a
commit
ac1aac1b63
|
|
@ -77,7 +77,8 @@ _mongosh_, the "jstest shell" (_mongo_) and many drivers provide simpler "watch"
|
|||
|
||||
### Opening a Collection-Level Change Stream
|
||||
|
||||
To open a collection-level change stream on a specific collection (e.g., `testDB.testCollection`):
|
||||
To open a collection-level change stream on a specific collection (e.g., `testDB.testCollection`),
|
||||
the following _mongosh_ command can be used:
|
||||
|
||||
```js
|
||||
db.getSiblingDB("testDB").runCommand({
|
||||
|
|
@ -93,7 +94,8 @@ db.getSiblingDB("testDB").runCommand({
|
|||
|
||||
### Opening a Database-Level Change Stream
|
||||
|
||||
To open a database-level change stream on a specific database (e.g., `testDB`), use
|
||||
To open a database-level change stream on a specific database (e.g., `testDB`), use the following
|
||||
command in _mongosh_:
|
||||
|
||||
```js
|
||||
db.getSiblingDB("testDB").runCommand({
|
||||
|
|
@ -115,7 +117,8 @@ The internal namespace that is used by database-level change streams is `<dbName
|
|||
### Opening an All-Cluster Change Stream
|
||||
|
||||
All-cluster change streams can only be opened on the `admin` database and also need the
|
||||
`allChangesForCluster` flag to be set to `true` in order to work:
|
||||
`allChangesForCluster` flag to be set to `true` in order to work. The following _mongosh_ command
|
||||
can be used to open an all-cluster change stream:
|
||||
|
||||
```js
|
||||
db.adminCommand({
|
||||
|
|
@ -188,6 +191,10 @@ class. The resume token internal data is stored in [ResumeTokenData](https://git
|
|||
|
||||
Resume tokens are versioned. Currently only version 2 is supported.
|
||||
|
||||
Future versions may introduce new resume token versions. Client applications should treat resume
|
||||
tokens as opaque identifiers and should not make any assumptions about the format or internals
|
||||
or resume tokens, nor should they rely on the internal implementation details of resume tokens.
|
||||
|
||||
### Change Stream Cursors
|
||||
|
||||
When opening a change stream on a replica set, a cursor will be established on the targeted replica
|
||||
|
|
|
|||
|
|
@ -345,24 +345,22 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
|
|||
// Check that the oplog entry format is as expected:
|
||||
// - if there is an '_id' field, it is a replace.
|
||||
// - if there is no '_id' field and the '$v' is 2, it is a delta (diff) update.
|
||||
// - if there is no '_id' field and the '$v' is not 2, it is a modifier update.
|
||||
uassert(6741200,
|
||||
str::stream() << "Expected _id field, or $v field missing, or $v equal to "
|
||||
<< static_cast<int>(UpdateOplogEntryVersion::kUpdateNodeV1)
|
||||
<< " (kUpdateNodeV1) or "
|
||||
<< static_cast<int>(UpdateOplogEntryVersion::kDeltaV2)
|
||||
<< " (kDeltaV2), but got oplog version $v: "
|
||||
<< oplogVersion.toString(),
|
||||
!id.missing() || oplogVersion.missing() ||
|
||||
oplogVersion.getInt() ==
|
||||
static_cast<int>(UpdateOplogEntryVersion::kUpdateNodeV1) ||
|
||||
oplogVersion.getInt() ==
|
||||
static_cast<int>(UpdateOplogEntryVersion::kDeltaV2));
|
||||
|
||||
// If there is no '_id' field and the '$v' is not 2, it is an old-style modifier
|
||||
// update. This is unsupported.
|
||||
// It is important to check for '_id' field first, because a replacement style update
|
||||
// can still have a '$v' field in the object.
|
||||
if (id.missing() && !oplogVersion.missing() &&
|
||||
oplogVersion.getInt() == static_cast<int>(UpdateOplogEntryVersion::kDeltaV2)) {
|
||||
const bool isUpdateEntry = id.missing();
|
||||
uassert(
|
||||
6741200,
|
||||
str::stream() << "Expected _id field, or $v field missing, or $v equal to "
|
||||
<< static_cast<int>(UpdateOplogEntryVersion::kDeltaV2)
|
||||
<< " (kDeltaV2), but got oplog version $v: "
|
||||
<< oplogVersion.toString(),
|
||||
!isUpdateEntry ||
|
||||
(!oplogVersion.missing() && oplogVersion.getType() == BSONType::numberInt &&
|
||||
oplogVersion.getInt() == static_cast<int>(UpdateOplogEntryVersion::kDeltaV2)));
|
||||
|
||||
if (isUpdateEntry) {
|
||||
// Parsing the delta oplog entry.
|
||||
operationType = DocumentSourceChangeStream::kUpdateOpType;
|
||||
Value diffObj = input[repl::OplogEntry::kObjectFieldName]
|
||||
|
|
@ -395,6 +393,7 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
|
|||
}
|
||||
}
|
||||
} else {
|
||||
// Replace.
|
||||
operationType = DocumentSourceChangeStream::kReplaceOpType;
|
||||
fullDocument = input[repl::OplogEntry::kObjectFieldName];
|
||||
}
|
||||
|
|
@ -630,7 +629,7 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
|
|||
boost::optional<std::pair<StringData, Value>>
|
||||
ChangeStreamDefaultEventTransformation::handleSupportedEvent(const Document& o2Field) const {
|
||||
for (auto&& supportedEvent : _supportedEvents) {
|
||||
if (auto lookup = o2Field[supportedEvent]; !o2Field[supportedEvent].missing()) {
|
||||
if (auto lookup = o2Field[supportedEvent]; !lookup.missing()) {
|
||||
// Known event.
|
||||
return std::make_pair(supportedEvent,
|
||||
Value{copyDocExceptFields(o2Field, {supportedEvent})});
|
||||
|
|
|
|||
|
|
@ -4507,9 +4507,9 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissing) {
|
||||
// A missing $v field in the update oplog entry implies $v:1, which is no longer supported.
|
||||
BSONObj o = BSON("$set" << BSON("y" << 1));
|
||||
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissingWithId) {
|
||||
// An _id field in the update oplog entry implies a replace.
|
||||
BSONObj o = BSON("_id" << 1 << "$set" << BSON("y" << 1));
|
||||
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
|
||||
auto replace = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
|
||||
|
||||
|
|
@ -4519,7 +4519,7 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissing) {
|
|||
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
|
||||
{DSChangeStream::kClusterTimeField, kDefaultTs},
|
||||
{DSChangeStream::kWallTimeField, Date_t()},
|
||||
{DSChangeStream::kFullDocumentField, D{{"$set", D{{"y", 1}}}}},
|
||||
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"$set", D{{"y", 1}}}}},
|
||||
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
|
||||
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
|
||||
};
|
||||
|
|
@ -4527,6 +4527,23 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissing) {
|
|||
checkTransformation(replace, expectedReplace);
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsVMissingWithoutId) {
|
||||
// A missing _id field and a missing $v field in the update oplog entry implies a $v:1 update
|
||||
// oplog entry, which is no longer supported.
|
||||
BSONObj o = BSON("$set" << BSON("y" << 1));
|
||||
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
|
||||
auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
|
||||
checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200);
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsInvalidV) {
|
||||
// A $v field in the update oplog entry without an _id field requires '$v:2'.
|
||||
BSONObj o = BSON("$v" << "ABC" << "diff" << BSON("i" << BSON("v" << 5)));
|
||||
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
|
||||
auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
|
||||
checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200);
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsDeltaUpdateNonV2NotSupported) {
|
||||
BSONObj diff = BSON("u" << BSON("y" << 1));
|
||||
BSONObj o = BSON("diff" << diff << "$v" << 3);
|
||||
|
|
@ -4554,20 +4571,8 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsDeltaUpdate) {
|
|||
TEST_F(ChangeStreamStageDBTest, TransformUpdateFieldsModifierUpdate) {
|
||||
BSONObj o = BSON("x" << 2 << "y" << 1);
|
||||
BSONObj o2 = BSON("_id" << 1);
|
||||
auto replace = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
|
||||
|
||||
Document expectedReplace{
|
||||
{DSChangeStream::kIdField,
|
||||
makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)},
|
||||
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
|
||||
{DSChangeStream::kClusterTimeField, kDefaultTs},
|
||||
{DSChangeStream::kWallTimeField, Date_t()},
|
||||
{DSChangeStream::kFullDocumentField, D{{"x", 2}, {"y", 1}}},
|
||||
{DSChangeStream::kNamespaceField, D{{"db", nss.db_forTest()}, {"coll", nss.coll()}}},
|
||||
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
|
||||
};
|
||||
|
||||
checkTransformation(replace, expectedReplace);
|
||||
auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
|
||||
checkTransformation(updateField, boost::none, kDefaultSpec, {}, {}, {}, 6741200);
|
||||
}
|
||||
|
||||
TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue