SERVER-107439 Implement $changeStream stage building for v2 change stream reader (#39953)

Co-authored-by: Denis Grebennicov <denis.grebennicov@mongodb.com>
GitOrigin-RevId: 802f4d7b4334e9840d23e14c7e9e890a6c7bd05d
This commit is contained in:
Jan 2025-09-10 12:21:55 +02:00 committed by MongoDB Bot
parent 192a5aa54e
commit dab4e2b53a
31 changed files with 1607 additions and 156 deletions

View File

@ -143,28 +143,29 @@ checkQueryShapeAndHash(
"8BFCDD5DA40E82A947514CD30503795B72B3B61DD2F5455133F1D7120F9394B2",
);
// The following tests rely on fields that were added to the $changeStreams stage in v8.2, so we
// only execute them if the FCV version is high enough.
// The following tests rely on fields that were added to the $changeStreams stage in v8.2, and for
// which query shape hash computation was changed in v8.3, so we only execute them if the FCV
// version is high enough.
const fcvDoc = db.adminCommand({getParameter: 1, featureCompatibilityVersion: 1});
if (MongoRunner.compareBinVersions(fcvDoc.featureCompatibilityVersion.version, "8.2") >= 0) {
if (MongoRunner.compareBinVersions(fcvDoc.featureCompatibilityVersion.version, "8.3") >= 0) {
// Check shape and hash when setting the change stream reader version.
checkQueryShapeAndHash(
db,
{aggregate: 1, pipeline: [{$changeStream: {version: "v1"}}], $db: db.getName()},
"EC5DB9EA352364BD4061C6CBC605C887347ED0B30B9CE8D5B3BFFB6BE5F81AF5",
"ABD13DBF8CFAE39BF22941780FFCF8BB111582FB30C6CDC832933EA0E29F1C3D",
);
checkQueryShapeAndHash(
db,
{aggregate: 1, pipeline: [{$changeStream: {version: "v2"}}], $db: db.getName()},
"CA169C644BFED846C3782FF04DB2AA18CA0EBC141AF71654D77B245AD9C5B1F0",
"ABD13DBF8CFAE39BF22941780FFCF8BB111582FB30C6CDC832933EA0E29F1C3D",
);
// Check shape and hash when setting the 'supportedEvents' field.
checkQueryShapeAndHash(
db,
{aggregate: 1, pipeline: [{$changeStream: {supportedEvents: ["foo"]}}], $db: db.getName()},
"B953C7FD7733C29EEEFB27EC963A2C9D123AB94311959A9B2749DD745BCBC838",
"ABD13DBF8CFAE39BF22941780FFCF8BB111582FB30C6CDC832933EA0E29F1C3D",
);
// Check shape and hash for a change stream pipeline using the 'ignoreRemovedShards' flag.

View File

@ -2,30 +2,45 @@
* This test makes sure that change streams can be opened with different 'version' field values.
* @tags: [
* # "version" parameter for change streams is only available from v8.2 onwards.
* requires_fcv_82
* requires_fcv_82,
* uses_change_streams,
* ]
*/
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
const dbName = jsTestName();
const collName = jsTestName();
const validVersions = ["v1", "v2", undefined];
// TODO SERVER-106575 / SERVER-107442: reenable testing of "v2" reader version here.
const validVersions = ["v1", undefined];
const isPreciseShardTargetingEnabled = FeatureFlagUtil.isEnabled(db, "ChangeStreamPreciseShardTargeting");
function testChangeStreamWithVersionAttributeSet(version = undefined) {
// Specifying the "version" field does nothing when opening a change stream on a replica set or
// on a mongod, but it is still permitted to specify it.
let changeStreamParams = {};
if (version !== undefined) {
changeStreamParams.version = version;
}
// Specifying the "version" field does nothing when opening a change stream on a replica set or
// on a mongod, but it is still permitted to specify it.
const tests = [
{}, // Whole-cluster change stream
{collection: 1}, // Whole-DB change stream
let tests = [
{collection: collName}, // Collection change stream
];
// The change stream reader version of 'v2' can be requested even if the feature flag for precise shard
// targeting is disabled. In this case the change stream reader version will silently fall back to 'v1'.
// If the feature flag is enabled, we currently only support collection-level change streams for v2 readers.
// Database-level and all database-change streams are currently not implemented for v2 readers, so we
// only add them to the test when it is safe to do so (non-v2 change stream reader and/or feature flag is
// disabled).
if (version !== "v2" || !isPreciseShardTargetingEnabled) {
tests = tests.concat([
{collection: 1}, // Whole-DB change stream
{}, // Whole-cluster change stream
]);
}
tests.forEach((nss) => {
db.getSiblingDB(dbName).dropDatabase();
@ -48,7 +63,7 @@ function testChangeStreamWithVersionAttributeSet(version = undefined) {
assert(cursor);
assert(
validVersions.includes(version),
`expecting change stream to succeed with version ${tojson(version)}`,
`expecting successful change stream for version ${tojson(version)} to be included in validVersions`,
);
assert.commandWorked(testDB.getCollection(collName).insert({_id: 1}));
@ -62,7 +77,12 @@ function testChangeStreamWithVersionAttributeSet(version = undefined) {
cst.assertNextChangesEqual({cursor, expectedChanges: [expected]});
} catch (err) {
assert(!validVersions.includes(version), `expecting change stream to fail with version ${tojson(version)}`);
assert(
!validVersions.includes(version),
`expecting unsuccessful change stream for version ${tojson(
version,
)} to be excluded from validVersions. Error: ${tojsononeline(err)}`,
);
} finally {
cst.cleanUp();
}

View File

@ -144,6 +144,7 @@ mongo_cc_library(
"//src/mongo/db/stats:top.h",
],
hdrs = [
"change_stream.h",
"change_stream_constants.h",
"change_stream_helpers.h",
"change_stream_read_mode.h",
@ -192,6 +193,7 @@ mongo_cc_library(
],
deps = [
"document_sources_idl",
"//src/mongo/db:shard_role_api",
"//src/mongo/db/repl:oplog_entry",
],
)
@ -1288,6 +1290,7 @@ mongo_cc_library(
"//src/mongo/db/pipeline:sharded_agg_helpers",
"//src/mongo/db/query/write_ops:write_ops_parsers",
"//src/mongo/db/update:update_driver",
"//src/mongo/s/change_streams:change_streams_control_events",
"//src/mongo/s/query/exec:router_exec_stage",
],
)

View File

@ -34,25 +34,41 @@
namespace mongo {
ChangeStream::ChangeStream(ChangeStreamReadMode mode,
ChangeStreamType type,
boost::optional<NamespaceString> nss)
: _mode(mode), _type(type), _nss(std::move(nss)) {
namespace {
boost::optional<NamespaceString> convertedNss(ChangeStreamType type,
const boost::optional<NamespaceString>& nss) {
if (type == ChangeStreamType::kAllDatabases) {
// Cluster-wide change streams have to be opened on the "admin" database.
// Convert this to a Namespace without a value.
tassert(10656200,
"NSS for all-databases change stream must be empty",
!nss.has_value() || nss->isAdminDB());
return {};
}
if (_type == ChangeStreamType::kAllDatabases) {
tassert(10656200, "NSS for all-databases change stream must be empty", !_nss.has_value());
} else {
tassert(10656201,
"NSS for collection- or database-level change stream must not be empty",
_nss.has_value());
nss.has_value());
tassert(
10656202,
if (type == ChangeStreamType::kDatabase && nss->isCollectionlessAggregateNS()) {
// Convert 'dbName.$cmd.aggregate' namespace to just 'dbName'.
return NamespaceString(nss->dbName());
}
tassert(10656202,
"NSS for collection- or database-level change stream must have the right granularity",
(_type == ChangeStreamType::kDatabase) == _nss->isDbOnly());
}
(type == ChangeStreamType::kDatabase) == nss->isDbOnly());
// Return original namespace as is.
return nss;
}
} // namespace
ChangeStream::ChangeStream(ChangeStreamReadMode mode,
ChangeStreamType type,
const boost::optional<NamespaceString>& nss)
: _mode(mode), _type(type), _nss(convertedNss(type, nss)) {}
ChangeStreamReadMode ChangeStream::getReadMode() const {
return _mode;
}
@ -74,6 +90,10 @@ ChangeStreamType ChangeStream::getChangeStreamType(const NamespaceString& nss) {
ChangeStream ChangeStream::buildFromExpressionContext(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
tassert(10743905,
"expecting changeStreamSpec to be present in ExpressionContext",
expCtx->getChangeStreamSpec().has_value());
const auto& nss = expCtx->getNamespaceString();
return ChangeStream(fromIgnoreRemovedShardsParameter(static_cast<bool>(

View File

@ -59,7 +59,7 @@ class ChangeStream {
public:
ChangeStream(ChangeStreamReadMode mode,
ChangeStreamType type,
boost::optional<NamespaceString> nss);
const boost::optional<NamespaceString>& nss);
/**
* Returns the robustness level of the change stream instance.

View File

@ -46,6 +46,7 @@
#include "mongo/db/update/update_oplog_entry_serialization.h"
#include "mongo/db/update/update_oplog_entry_version.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/namespace_string_util.h"
#include "mongo/util/str.h"
@ -59,6 +60,7 @@
#include <boost/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
namespace mongo {
namespace {
@ -237,9 +239,9 @@ ChangeStreamDefaultEventTransformation::ChangeStreamDefaultEventTransformation(
_supportedEvents = buildSupportedEvents();
}
ChangeStreamDefaultEventTransformation::SupportedEvents
ChangeStreamEventTransformation::SupportedEvents
ChangeStreamDefaultEventTransformation::buildSupportedEvents() const {
SupportedEvents result;
ChangeStreamEventTransformation::SupportedEvents result;
// Check if field 'supportedEvents' is present, and handle it if so.
if (auto supportedEvents = _changeStreamSpec.getSupportedEvents()) {
@ -251,10 +253,14 @@ ChangeStreamDefaultEventTransformation::buildSupportedEvents() const {
}
}
// Add built-in sharding events to list of noop events we need to handle.
result.insert(kBuiltInNoopEvents.begin(), kBuiltInNoopEvents.end());
LOGV2_DEBUG(10743903,
3,
"default change stream transformation supports the following dynamic events",
"supportedEvents"_attr = result);
return result;
}
@ -291,20 +297,27 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
NamespaceString nss = createNamespaceStringFromOplogEntry(tenantId, ns.getStringData());
Value id = input.getNestedField("o._id");
// Non-replace updates have the _id in field "o2".
StringData operationType;
Value fullDocument;
Value updateDescription;
Value documentKey;
// Used to populate the 'operationDescription' output field and also to build the resumeToken
// for some events. Note that any change to the 'operationDescription' for existing events can
// break changestream resumability between different mongod versions and should thus be avoided!
Value operationDescription;
Value stateBeforeChange;
// Optional value containing the namespace type for changestream create events. This will be
// emitted as 'nsType' field.
Value nsType;
// By default, all events returned from here should populate their UUID field. This requirement
// can be overriden for specific event types below.
bool requireUUID = true;
MutableDocument doc;
switch (opType) {
@ -477,7 +490,7 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// Check for dynamic events that were specified via the 'supportedEvents' change stream
// parameter.
// This also checks for some hard-code sharding-related events.
// This also checks for some hard-coded sharding-related events.
if (auto result = handleSupportedEvent(o2Field)) {
// Apply returned event name and operationDescription.
operationType = result->first;
@ -494,6 +507,9 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
if (operationType == DocumentSourceChangeStream::kEndOfTransactionOpType) {
addTransactionIdFieldsIfPresent(o2Field, doc);
}
// Configured events do not require the UUID field to be present.
requireUUID = false;
break;
}
@ -505,13 +521,10 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
}
}
// UUID should always be present except for a known set of types.
// All of the extra event types specified in the 'supportedEvents' change stream parameter
// require the UUID field to be set in their oplog entry. Otherwise the following tassert will
// trigger.
// UUID should always be present except for a known set of operation types.
tassert(7826901,
"Saw a CRUD op without a UUID",
!uuid.missing() || kOpsWithoutUUID.contains(operationType));
str::stream() << "Saw a '" << operationType << "' op without a UUID",
!requireUUID || !uuid.missing() || kOpsWithoutUUID.contains(operationType));
// Extract the 'txnOpIndex' field. This will be missing unless we are unwinding a transaction.
auto txnOpIndex = input[DocumentSourceChangeStream::kTxnOpIndexField];

View File

@ -49,6 +49,8 @@ namespace mongo {
*/
class ChangeStreamEventTransformation {
public:
using SupportedEvents = StringDataSet;
ChangeStreamEventTransformation(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec);
@ -89,14 +91,16 @@ protected:
*/
class ChangeStreamDefaultEventTransformation final : public ChangeStreamEventTransformation {
public:
using SupportedEvents = StringDataSet;
ChangeStreamDefaultEventTransformation(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec);
Document applyTransformation(const Document& fromDoc) const override;
std::set<std::string> getFieldNameDependencies() const override;
const ChangeStreamEventTransformation::SupportedEvents& getSupportedEvents_forTest() const {
return _supportedEvents;
}
private:
/**
* Checks the 'o2Field' value of an oplog entry has any field name that is contained in
@ -110,14 +114,14 @@ private:
* Build the '_supportedEvents' container from the 'supportedEvents' change stream parameter.
* Can throw exceptions if 'supportedEvents' contains invalid values.
*/
SupportedEvents buildSupportedEvents() const;
ChangeStreamEventTransformation::SupportedEvents buildSupportedEvents() const;
/**
* Additional supported events that this transformer can handle. These events can be created off
* of "noop" oplog entries which have any of the supported events as a field name inside their
* 'o2' field value.
*/
SupportedEvents _supportedEvents;
ChangeStreamEventTransformation::SupportedEvents _supportedEvents;
};
/**
@ -155,6 +159,10 @@ public:
return accessedFields;
}
const ChangeStreamEventTransformation::SupportedEvents& getSupportedEvents_forTest() const {
return _defaultEventBuilder->getSupportedEvents_forTest();
}
private:
ChangeStreamEventTransformation* getBuilder(const Document& oplog) const;

View File

@ -36,7 +36,9 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/change_stream.h"
#include "mongo/db/pipeline/change_stream_helpers.h"
#include "mongo/db/pipeline/change_stream_reader_builder.h"
#include "mongo/db/pipeline/change_stream_rewrite_helpers.h"
#include "mongo/db/pipeline/data_to_shards_allocation_query_service.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/pipeline/expression_context.h"
@ -271,21 +273,27 @@ std::unique_ptr<MatchExpression> buildInvalidationFilter(
SerializationContext::stateDefault())
<< "$or" << invalidatingCommands.arr()));
return MatchExpressionParser::parseAndNormalize(invalidatingFilter, expCtx);
} // namespace change_stream_filter
}
void appendBaseTransactionFilter(BSONObjBuilder& applyOpsBuilder) {
applyOpsBuilder.append("op", "c");
applyOpsBuilder.append("o.prepare", BSON("$ne" << true));
applyOpsBuilder.append("o.partialTxn", BSON("$ne" << true));
}
std::unique_ptr<MatchExpression> buildTransactionFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MatchExpression* userMatch,
std::vector<BSONObj>& backingBsonObjs) {
BSONObjBuilder applyOpsBuilder;
appendBaseTransactionFilter(applyOpsBuilder);
// "o.applyOps" stores the list of operations, so it must be an array.
applyOpsBuilder.append("o.applyOps", BSON("$type" << "array"));
BSONObj nsMatch = DocumentSourceChangeStream::getNsMatchObjForChangeStream(expCtx);
// "o.applyOps" stores the list of operations, so it must be an array.
applyOpsBuilder.append("op", "c");
applyOpsBuilder.append("o.applyOps", BSON("$type" << "array"));
applyOpsBuilder.append("o.prepare", BSON("$ne" << true));
applyOpsBuilder.append("o.partialTxn", BSON("$ne" << true));
{
// Include this 'applyOps' if it has an operation with a matching namespace _or_ if it has a
// 'prevOpTime' link to another 'applyOps' command, indicating a multi-entry transaction.
@ -335,12 +343,53 @@ std::unique_ptr<MatchExpression> buildTransactionFilter(
auto transactionFilterWithUserMatch = std::make_unique<AndMatchExpression>();
transactionFilterWithUserMatch->add(std::move(transactionFilter));
transactionFilterWithUserMatch->add(std::move(rewrittenMatch));
return transactionFilterWithUserMatch;
transactionFilter = std::move(transactionFilterWithUserMatch);
}
// For sharded clusters, additionally match control events for v2 change stream readers. The
// user's match filter is not used here and must not have impact on which control events are
// emitted.
const BSONObj controlEventsFilter = [&]() {
if (expCtx->getInRouter() &&
expCtx->getChangeStreamSpec()->getVersion() == ChangeStreamReaderVersionEnum::kV2) {
return buildControlEventsFilterForDataShard(expCtx);
}
return BSONObj();
}();
if (!controlEventsFilter.isEmpty()) {
BSONObjBuilder bob;
appendBaseTransactionFilter(bob);
// "o.applyOps" stores the list of operations, so it must be an array.
bob.append("o.applyOps", BSON("$type" << "array" << "$elemMatch" << controlEventsFilter));
auto transactionFilterWithControlEventsFilter = std::make_unique<OrMatchExpression>();
transactionFilterWithControlEventsFilter->add(std::move(transactionFilter));
transactionFilterWithControlEventsFilter->add(MatchExpressionParser::parseAndNormalize(
backingBsonObjs.emplace_back(bob.obj()), expCtx));
transactionFilter = std::move(transactionFilterWithControlEventsFilter);
}
return transactionFilter;
}
BSONObj buildControlEventsFilterForDataShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
tassert(10743901,
"expecting v2 change stream reader version to be set",
expCtx->getChangeStreamSpec()->getVersion() == ChangeStreamReaderVersionEnum::kV2);
ChangeStreamReaderBuilder* readerBuilder =
ChangeStreamReaderBuilder::get(expCtx->getOperationContext()->getServiceContext());
tassert(10743902, "expecting ChangeStreamReaderBuilder to be available", readerBuilder);
return readerBuilder->buildControlEventFilterForDataShard(
expCtx->getOperationContext(), ChangeStream::buildFromExpressionContext(expCtx));
}
std::unique_ptr<MatchExpression> buildInternalOpFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MatchExpression* userMatch,

View File

@ -132,6 +132,11 @@ std::unique_ptr<MatchExpression> buildTransactionFilter(
const MatchExpression* userMatch,
std::vector<BSONObj>& backingBsonObjs);
/**
* Produce a filter matching control events on a data shard for v2 change stream readers.
*/
BSONObj buildControlEventsFilterForDataShard(const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
* Produce a filter matching any operations for internal change stream use. These events must not be
* filtered out by the change stream's oplog scan, even if they are not ultimately returned to the

View File

@ -31,13 +31,35 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/topology/sharding_state.h"
#include "mongo/util/assert_util.h"
#include <boost/optional/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
namespace mongo {
namespace change_stream {
namespace mongo::change_stream {
bool isRouterOrNonShardedReplicaSet(const boost::intrusive_ptr<ExpressionContext>& expCtx) {
if (expCtx->getInRouter()) {
return true;
}
if (expCtx->getForPerShardCursor()) {
// Sharded cluster mongod.
return false;
}
if (const auto* shardingState = ShardingState::get(expCtx->getOperationContext());
shardingState) {
auto role = shardingState->pollClusterRole();
const bool isReplSet = !role.has_value();
return isReplSet;
} else {
// Sharding state is not initialized. This is the case in unit tests and also on standalone
// mongods. But on standalone mongods we do not support change streams, so we will never get
// here on standalone mongods.
return false;
}
}
ResumeTokenData resolveResumeTokenFromSpec(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
@ -93,5 +115,4 @@ repl::MutableOplogEntry createEndOfTransactionOplogEntry(
return oplogEntry;
}
} // namespace change_stream
} // namespace mongo
} // namespace mongo::change_stream

View File

@ -37,9 +37,13 @@
#include <boost/smart_ptr/intrusive_ptr.hpp>
namespace mongo {
namespace mongo::change_stream {
/**
* Tests if we are currently running on a router or in a non-sharded replica set context.
*/
bool isRouterOrNonShardedReplicaSet(const boost::intrusive_ptr<ExpressionContext>& expCtx);
namespace change_stream {
/**
* Extracts the resume token from the given spec. If a 'startAtOperationTime' is specified,
* returns the equivalent high-watermark token. This method should only ever be called on a spec
@ -75,6 +79,4 @@ static const std::set<StringData> kClassicOperationTypes =
DocumentSourceChangeStream::kReshardBlockingWritesOpType,
DocumentSourceChangeStream::kReshardDoneCatchUpOpType,
DocumentSourceChangeStream::kNewShardDetectedOpType};
} // namespace change_stream
} // namespace mongo
} // namespace mongo::change_stream

View File

@ -94,6 +94,20 @@ TEST(ChangeStreamTest, DatabaseLevelChangeStream) {
ASSERT_TRUE(actualNss->isDbOnly());
}
TEST(ChangeStreamTest, DatabaseLevelChangeStreamOnCollectionLessAggregateNS) {
boost::optional<NamespaceString> nss =
NamespaceString::createNamespaceString_forTest("testDB.$cmd.aggregate");
ChangeStream changeStream(ChangeStreamReadMode::kStrict, ChangeStreamType::kDatabase, nss);
ASSERT_EQ(ChangeStreamType::kDatabase, changeStream.getChangeStreamType());
auto actualNss = changeStream.getNamespace();
ASSERT_TRUE(actualNss.has_value());
ASSERT_EQ(NamespaceString::createNamespaceString_forTest("testDB"), *actualNss);
ASSERT_TRUE(actualNss->isDbOnly());
}
DEATH_TEST_REGEX(ChangeStreamTest,
DatabaseLevelChangeStreamWithoutNamespace,
"Tripwire assertion.*10656201") {
@ -125,6 +139,27 @@ TEST(ChangeStreamTest, AllDatabasesChangeStream) {
ASSERT_FALSE(changeStream.getNamespace().has_value());
}
TEST(ChangeStreamTest, AllDatabasesChangeStreamOnAdminDB) {
boost::optional<NamespaceString> nss = NamespaceString::createNamespaceString_forTest("admin");
ChangeStream changeStream(ChangeStreamReadMode::kStrict, ChangeStreamType::kAllDatabases, nss);
ASSERT_EQ(ChangeStreamType::kAllDatabases, changeStream.getChangeStreamType());
ASSERT_FALSE(changeStream.getNamespace().has_value());
}
DEATH_TEST_REGEX(ChangeStreamTest,
AllDatabasesChangeStreamWithNamespace,
"Tripwire assertion.*10656200") {
// Not allowed to create an all databases change stream with an NSS.
ASSERT_THROWS_CODE(ChangeStream(ChangeStreamReadMode::kStrict,
ChangeStreamType::kAllDatabases,
NamespaceString::createNamespaceString_forTest("testDB")),
AssertionException,
10656200);
}
TEST(ChangeStreamTest, ChangeStreamGetTypeCollection) {
auto nss = NamespaceString::createNamespaceString_forTest("unittest"_sd, "someCollection"_sd);
ASSERT_EQ(ChangeStreamType::kCollection, ChangeStream::getChangeStreamType(nss));
@ -143,15 +178,4 @@ TEST(ChangeStreamTest, ChangeStreamGetTypeAllDatabases) {
ASSERT_EQ(ChangeStreamType::kAllDatabases, ChangeStream::getChangeStreamType(nss));
}
DEATH_TEST_REGEX(ChangeStreamTest,
AllDatabasesChangeStreamWithNamespace,
"Tripwire assertion.*10656200") {
// Not allowed to create an all databases change stream with an NSS.
ASSERT_THROWS_CODE(ChangeStream(ChangeStreamReadMode::kStrict,
ChangeStreamType::kAllDatabases,
NamespaceString::createNamespaceString_forTest("testDB")),
AssertionException,
10656200);
}
} // namespace mongo

View File

@ -31,7 +31,6 @@
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/json.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/value.h"
@ -45,8 +44,6 @@
#include "mongo/util/uuid.h"
#include <cstddef>
#include <string>
#include <vector>
#include <boost/move/utility_core.hpp>
#include <boost/none.hpp>

View File

@ -31,6 +31,7 @@
#include "mongo/db/pipeline/data_to_shards_allocation_query_service.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/str.h"
#include <algorithm>
#include <deque>
@ -55,6 +56,10 @@ public:
return _responses.empty();
}
void allowAnyClusterTime() {
_allowAnyClusterTime = true;
}
/**
* Check that the provided cluster time is equal to next buffered response's cluster time, and
* then hand out the buffered status for it.
@ -64,14 +69,19 @@ public:
uassert(10612400, "queue should not be empty", !empty());
auto response = _responses.front();
uassert(10612401,
"cluster time should be equal to expected cluster time",
clusterTime == response.first);
str::stream() << "cluster time should be equal to expected cluster time. expected: "
<< response.first.toStringPretty()
<< ", actual: " << clusterTime.toStringPretty(),
_allowAnyClusterTime || clusterTime == response.first);
_responses.pop_front();
return response.second;
}
private:
std::deque<Response> _responses;
// If set to true, allows any cluster time for the 'getAllocationToShardsStatus' calls.
bool _allowAnyClusterTime = false;
};
} // namespace mongo

View File

@ -31,9 +31,10 @@
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/database_name.h"
#include "mongo/db/pipeline/change_stream.h"
#include "mongo/db/pipeline/change_stream_filter_helpers.h"
#include "mongo/db/pipeline/change_stream_helpers.h"
#include "mongo/db/pipeline/change_stream_reader_builder.h"
#include "mongo/db/pipeline/data_to_shards_allocation_query_service.h"
#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_add_pre_image.h"
#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
@ -41,6 +42,7 @@
#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2.h"
#include "mongo/db/pipeline/document_source_change_stream_inject_control_events.h"
#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
@ -50,7 +52,9 @@
#include "mongo/db/query/allowed_contexts.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/server_options.h"
#include "mongo/db/vector_clock/vector_clock.h"
#include "mongo/db/version_context.h"
#include "mongo/idl/idl_parser.h"
#include <string>
@ -60,6 +64,8 @@
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <fmt/format.h>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
namespace mongo {
using boost::intrusive_ptr;
@ -298,19 +304,123 @@ std::list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromB
spec.setStartAfter(boost::none);
}
// Obtain the resume token from the spec. This will be used when building the pipeline.
auto resumeToken = change_stream::resolveResumeTokenFromSpec(expCtx, spec);
const auto& nss = expCtx->getNamespaceString();
ChangeStream changeStream(
fromIgnoreRemovedShardsParameter(static_cast<bool>(spec.getIgnoreRemovedShards())),
ChangeStream::getChangeStreamType(nss),
nss);
ChangeStreamReaderVersionEnum changeStreamVersion =
_determineChangeStreamReaderVersion(expCtx, resumeToken.clusterTime, spec, changeStream);
// Override global change stream reader version with the version just determined.
spec.setVersion(changeStreamVersion);
const bool useV2ChangeStreamReader = changeStreamVersion == ChangeStreamReaderVersionEnum::kV2;
if (useV2ChangeStreamReader) {
OperationContext* opCtx = expCtx->getOperationContext();
ChangeStreamReaderBuilder* readerBuilder =
ChangeStreamReaderBuilder::get(opCtx->getServiceContext());
tassert(10743908, "expecting ChangeStreamReaderBuilder to be available", readerBuilder);
// Set supported control events for the 'DocumentSourceChangeStreamTransform' stage. That
// stage will pick up the supported events from the change stream definition in the
// ExpressionContext later.
auto controlEventTypes =
readerBuilder->getControlEventTypesOnDataShard(opCtx, changeStream);
std::vector<std::string> supportedEvents(controlEventTypes.begin(),
controlEventTypes.end());
spec.setSupportedEvents(std::move(supportedEvents));
}
// Save a copy of the spec on the expression context. Used when building the oplog filter.
expCtx->setChangeStreamSpec(spec);
return _buildPipeline(expCtx, spec);
return _buildPipeline(expCtx, spec, resumeToken, useV2ChangeStreamReader);
}
ChangeStreamReaderVersionEnum DocumentSourceChangeStream::_determineChangeStreamReaderVersion(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Timestamp atClusterTime,
const DocumentSourceChangeStreamSpec& spec,
const ChangeStream& changeStream) try {
if (!expCtx->getInRouter()) {
// If we are not on a router, we always set reader version v1.
return ChangeStreamReaderVersionEnum::kV1;
}
OperationContext* opCtx = expCtx->getOperationContext();
// Check feature flag 'featureFlagChangeStreamPreciseShardTargeting' that is required to enable
// v2 change stream readers.
if (!feature_flags::gFeatureFlagChangeStreamPreciseShardTargeting.isEnabled(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
// Feature flag is disabled. Default to v1.
return ChangeStreamReaderVersionEnum::kV1;
}
// Check If the user explicitly requested a specific change stream reader version.
const auto& version = spec.getVersion();
// If no change stream reader version was explicitly selected, or if it was set explicitly to
// v1, change stream reader version v1 will be used.
if (!version.has_value() || *version == ChangeStreamReaderVersionEnum::kV1) {
return ChangeStreamReaderVersionEnum::kV1;
}
// The user has explicitly selected the v2 change stream reader version.
// v2 change stream readers are currently only supported for collection-level change streams.
if (changeStream.getChangeStreamType() != ChangeStreamType::kCollection) {
return ChangeStreamReaderVersionEnum::kV1;
}
ChangeStreamReaderBuilder* readerBuilder =
ChangeStreamReaderBuilder::get(opCtx->getServiceContext());
tassert(10743904, "expecting ChangeStreamReaderBuilder to be available", readerBuilder);
DataToShardsAllocationQueryService* dataToShardsAllocationQueryService =
DataToShardsAllocationQueryService::get(opCtx);
tassert(10743906,
"expecting DataToShardsAllocationQueryService to be available",
dataToShardsAllocationQueryService);
if (dataToShardsAllocationQueryService->getAllocationToShardsStatus(opCtx, atClusterTime) ==
AllocationToShardsStatus::kNotAvailable) {
// No shard placement information is available. Use v1 change stream reader.
return ChangeStreamReaderVersionEnum::kV1;
}
// All requirements for using a v2 change stream reader are satisfied.
return ChangeStreamReaderVersionEnum::kV2;
} catch (const DBException& ex) {
// Log any error that we have caught while determining the change stream reader version.
LOGV2_DEBUG(10743907,
3,
"caught exception while determining change stream reader version",
"error"_attr = ex.toStatus());
throw;
}
std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_buildPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec) {
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec,
const ResumeTokenData& resumeToken,
bool useV2ChangeStreamReader) {
// Build the actual change stream pipeline.
std::list<boost::intrusive_ptr<DocumentSource>> stages;
// Obtain the resume token from the spec. This will be used when building the pipeline.
auto resumeToken = change_stream::resolveResumeTokenFromSpec(expCtx, spec);
// Unfold the $changeStream into its constituent stages and add them to the pipeline.
stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec));
stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx));
@ -332,10 +442,20 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui
// change event is detected, this stage forwards the event directly to the executor via an
// exception (bypassing the rest of the pipeline). Router must see all topology change events,
// so it's important that this stage occurs before any filtering is performed.
if (expCtx->getInRouter()) {
if (expCtx->getInRouter() && !useV2ChangeStreamReader) {
// Only add this stage for V1 change stream readers. V2 change stream readers handle all
// topology changes via the HandleTopologyChangeV2 stage.
stages.push_back(DocumentSourceChangeStreamCheckTopologyChange::create(expCtx));
}
if (expCtx->getInRouter() && useV2ChangeStreamReader) {
// For V2 change stream readers in sharded clusters, add the DSCSInjectControlEvents stage
// on the shards. The control events filter will always be for a data-shard when we are
// here, as the control events stage for a config server is built elsewhere.
stages.push_back(
DocumentSourceChangeStreamInjectControlEvents::createForDataShard(expCtx, spec));
}
// If 'fullDocumentBeforeChange' is not set to 'off', add the DSCSAddPreImage stage into the
// pipeline. We place this stage here so that any $match stages which follow the $changeStream
// pipeline may be able to skip ahead of the DSCSAddPreImage stage. This allows a whole-db or
@ -350,12 +470,20 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui
stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx, spec));
}
// If the pipeline is built on router, then the DSCSHandleTopologyChange stage acts as the
// split point for the pipline. All stages before this stages will run on shards and all
// stages after and inclusive of this stage will run on the router.
// If the pipeline is built on router, inject a DSCSHandleTopologyChange stage for v1 change
// stream readers or a DSC"HandleTopologyChangeV2 stage for v2 change stream readers. The
// DSCSHandleTopologyChange(V2) stage acts as the split point for the pipeline. All stages
// before this stage will run on shards and all stages after and inclusive of this stage will
// run on the router.
if (expCtx->getInRouter()) {
if (useV2ChangeStreamReader) {
// V2 change stream reader, using the HandleTopologyChangeV2 stage.
stages.push_back(DocumentSourceChangeStreamHandleTopologyChangeV2::create(expCtx));
} else {
// V1 change stream reader, using the HandleTopologyChange stage.
stages.push_back(DocumentSourceChangeStreamHandleTopologyChange::create(expCtx));
}
}
// If the resume point is an event, we must include a DSCSEnsureResumeTokenPresent stage.
if (!ResumeToken::isHighWaterMarkToken(resumeToken)) {

View File

@ -42,6 +42,7 @@
#include "mongo/db/auth/resource_pattern.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/change_stream.h"
#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
@ -389,9 +390,24 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx);
private:
// Determines the change stream reader version (v1 or v2) from the user's change stream request
// ('spec') parameter. The v2 reader version will only be selected if the feature flag for
// precise change stream shard-targeting ('featureFlagChangeStreamPreciseShardTargeting') is
// enabled. In addition, the change stream must have been opened on a collection, and the user
// must have explicitly selected the v2 version in the request. For all other combinations, the
// v1 change stream reader version will be selected.
static ChangeStreamReaderVersionEnum _determineChangeStreamReaderVersion(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Timestamp atClusterTime,
const DocumentSourceChangeStreamSpec& spec,
const ChangeStream& changeStream);
// Constructs and returns a series of stages representing the full change stream pipeline.
static std::list<boost::intrusive_ptr<DocumentSource>> _buildPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec);
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec,
const ResumeTokenData& resumeToken,
bool useV2ChangeStreamReader);
// Helper function which throws if the $changeStream fails any of a series of semantic checks.
// For instance, whether it is permitted to run given the current FCV, whether the namespace is

View File

@ -228,6 +228,8 @@ structs:
type: ChangeStreamReaderVersion
description: Selects the change stream reader version.
optional: true
# This field is for internal purposes only and not supposed to be set by end users.
# The change stream reader version is not serialized as part of the query shape.
query_shape: parameter
ignoreRemovedShards:
@ -251,6 +253,8 @@ structs:
supported events as a field name inside their "o2" field value.
All of these oplog entries must have the "uuid" field set as well.
optional: true
# This field is for internal purposes only and not supposed to be set by end users.
# The field is not serialized as part of the query shape
query_shape: literal
DocumentSourceChangeStreamOplogMatchSpec:

View File

@ -32,8 +32,11 @@
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/change_stream_reader_builder.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/s/change_streams/control_events.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/str.h"
@ -109,6 +112,29 @@ BSONObj DocumentSourceChangeStreamInjectControlEvents::ActionsHelper::serializeT
return bob.obj();
}
DocumentSourceChangeStreamInjectControlEvents::ActionsMap
DocumentSourceChangeStreamInjectControlEvents::ActionsHelper::buildMapForDataShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
ChangeStreamReaderBuilder* readerBuilder =
ChangeStreamReaderBuilder::get(expCtx->getOperationContext()->getServiceContext());
tassert(10743900, "expecting ChangeStreamReaderBuilder to be available", readerBuilder);
BSONObjBuilder controlEventsBuilder;
for (const auto& eventType : readerBuilder->getControlEventTypesOnDataShard(
expCtx->getOperationContext(), ChangeStream::buildFromExpressionContext(expCtx))) {
if (eventType == MoveChunkControlEvent::opType && spec.getShowSystemEvents()) {
controlEventsBuilder.append(eventType, kActionNameInjectControlEvent);
} else {
controlEventsBuilder.append(
eventType,
DocumentSourceChangeStreamInjectControlEvents::kActionNameTransformToControlEvent);
}
}
return parseFromBSON(controlEventsBuilder.obj());
}
DocumentSourceChangeStreamInjectControlEvents::DocumentSourceChangeStreamInjectControlEvents(
const intrusive_ptr<ExpressionContext>& expCtx,
DocumentSourceChangeStreamInjectControlEvents::ActionsMap actions)
@ -124,6 +150,14 @@ DocumentSourceChangeStreamInjectControlEvents::create(
ActionsHelper::parseFromBSON(actions));
}
intrusive_ptr<DocumentSourceChangeStreamInjectControlEvents>
DocumentSourceChangeStreamInjectControlEvents::createForDataShard(
const intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) {
return new DocumentSourceChangeStreamInjectControlEvents(
expCtx, ActionsHelper::buildMapForDataShard(expCtx, spec));
}
intrusive_ptr<DocumentSourceChangeStreamInjectControlEvents>
DocumentSourceChangeStreamInjectControlEvents::createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) {

View File

@ -89,6 +89,13 @@ public:
* Convert the internal C++ actions type into a BSONObj.
*/
static BSONObj serializeToBSON(const ActionsMap& actions);
/**
* Build the action map for a data shard from the current change stream specification.
*/
static ActionsMap buildMapForDataShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec);
};
const char* getSourceName() const override;
@ -123,12 +130,20 @@ public:
const DocumentSourceChangeStreamSpec& spec,
const BSONObj& actions = {});
static boost::intrusive_ptr<DocumentSourceChangeStreamInjectControlEvents> createForDataShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec);
static const Id& id;
Id getId() const override {
return id;
}
const ActionsMap& getActionsMap_forTest() const {
return _actions;
}
private:
friend boost::intrusive_ptr<exec::agg::Stage>
documentSourceChangeStreamInjectControlEventsToStageFn(

View File

@ -39,11 +39,11 @@
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/resume_token.h"
#include "mongo/db/query/compiler/rewrites/matcher/expression_optimizer.h"
#include "mongo/db/server_options.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/util/assert_util.h"
#include <algorithm>
#include <iterator>
#include <list>
#include <memory>
#include <boost/move/utility_core.hpp>
@ -106,6 +106,13 @@ std::unique_ptr<MatchExpression> buildOplogMatchFilter(
eventFilter->add(buildViewDefinitionEventFilter(expCtx, userMatch, backingBsonObjs));
}
// For sharded clusters, add an oplog filter for control events for v2 change stream readers.
if (expCtx->getInRouter() &&
expCtx->getChangeStreamSpec()->getVersion() == ChangeStreamReaderVersionEnum::kV2) {
eventFilter->add(MatchExpressionParser::parseAndNormalize(
backingBsonObjs.emplace_back(buildControlEventsFilterForDataShard(expCtx)), expCtx));
}
// Build the final $match filter to be applied to the oplog.
oplogFilter->add(std::move(eventFilter));

File diff suppressed because it is too large Load Diff

View File

@ -34,20 +34,21 @@
#include "mongo/db/pipeline/change_stream_helpers.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/resume_token.h"
#include "mongo/db/topology/sharding_state.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/util/assert_util.h"
#include <utility>
#include <boost/move/utility_core.hpp>
#include <boost/optional/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
namespace mongo {
namespace {
// The following fields will be removed for query shape serialization.
const StringDataSet kFieldsToRemoveForQueryShapeSerialization = {"version", "supportedEvents"};
} // namespace
REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamTransform,
LiteParsedDocumentSourceChangeStreamInternal::parse,
DocumentSourceChangeStreamTransform::createFromBson,
@ -69,32 +70,13 @@ DocumentSourceChangeStreamTransform::createFromBson(
auto spec =
DocumentSourceChangeStreamSpec::parse(rawSpec.Obj(), IDLParserContext("$changeStream"));
// Set the change stream spec on the expression context.
expCtx->setChangeStreamSpec(spec);
auto canUseSupportedEvents = [&]() {
if (expCtx->getInRouter()) {
// 'supportedEvents' are not supposed to be used on a router.
return false;
}
const auto* shardingState = ShardingState::get(expCtx->getOperationContext());
if (!shardingState) {
// Sharding state is not initialized. This is the case in unit tests and also on
// standalone mongods. But on standalone mongods we do not support change streams, so we
// will never get here.
return true;
}
// Also 'supportedEvents' cannot be used in a non-shard replica set.
auto role = shardingState->pollClusterRole();
const bool isReplSet = !role.has_value();
return !isReplSet;
};
uassert(10498501,
"Expecting 'supportedEvents' to be set only on a shard mongod, not on router or "
"replica set member",
canUseSupportedEvents());
!spec.getSupportedEvents() || !change_stream::isRouterOrNonShardedReplicaSet(expCtx));
// Set the change stream spec on the expression context.
expCtx->setChangeStreamSpec(spec);
return new DocumentSourceChangeStreamTransform(expCtx, std::move(spec));
}
@ -134,10 +116,22 @@ StageConstraints DocumentSourceChangeStreamTransform::constraints(
}
Value DocumentSourceChangeStreamTransform::serialize(const SerializationOptions& opts) const {
BSONObj serializedOptions = [&]() -> BSONObj {
BSONObj serialized = _changeStreamSpec.toBSON(opts);
if (opts.literalPolicy != LiteralSerializationPolicy::kUnchanged) {
// Explicitly remove specific fields from the '$changeStream' stage serialization for
// query shapes that should not have any influence on the query shape hash computation.
serialized = serialized.removeFields(kFieldsToRemoveForQueryShapeSerialization);
}
return serialized;
}();
if (opts.isSerializingForExplain()) {
return Value(Document{{DocumentSourceChangeStream::kStageName,
Document{{"stage"_sd, "internalTransform"_sd},
{"options"_sd, _changeStreamSpec.toBSON(opts)}}}});
return Value(Document{
{DocumentSourceChangeStream::kStageName,
Document{{"stage"_sd, "internalTransform"_sd}, {"options"_sd, serializedOptions}}}});
}
// Internal change stream stages are not serialized for query stats. Query stats uses this stage
@ -146,7 +140,7 @@ Value DocumentSourceChangeStreamTransform::serialize(const SerializationOptions&
auto stageName = (opts.isSerializingForQueryStats())
? DocumentSourceChangeStream::kStageName
: DocumentSourceChangeStreamTransform::kStageName;
return Value(Document{{stageName, _changeStreamSpec.toBSON(opts)}});
return Value(Document{{stageName, serializedOptions}});
}
DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTracker* deps) const {

View File

@ -99,6 +99,10 @@ public:
return id;
}
const ChangeStreamEventTransformation::SupportedEvents& getSupportedEvents_forTest() const {
return _transformer->getSupportedEvents_forTest();
}
private:
friend boost::intrusive_ptr<exec::agg::Stage> documentSourceChangeStreamTransformToStageFn(
const boost::intrusive_ptr<DocumentSource>& documentSource);

View File

@ -86,7 +86,7 @@ std::unique_ptr<MatchExpression> buildUnwindTransactionFilter(
// The transaction unwind filter is the same as the operation filter applied to the oplog. This
// includes a namespace filter, which ensures that it will discard all documents that would be
// filtered out by the default 'ns' filter this stage gets initialized with.
auto unwindFilter =
std::unique_ptr<ListOfMatchExpression> unwindFilter =
std::make_unique<AndMatchExpression>(buildOperationFilter(expCtx, nullptr, bsonObj));
// To correctly handle filtering out entries of direct write operations on orphaned documents,
@ -104,6 +104,20 @@ std::unique_ptr<MatchExpression> buildUnwindTransactionFilter(
expCtx, userMatch, bsonObj, {}, kUnwindExcludedFields)) {
unwindFilter->add(std::move(rewrittenMatch));
}
// For sharded clusters, additionally match control events for v2 change stream readers. The
// user's match filter is not used here and must not have impact on which control events are
// emitted.
if (expCtx->getInRouter() &&
expCtx->getChangeStreamSpec()->getVersion() == ChangeStreamReaderVersionEnum::kV2) {
if (BSONObj controlEventsFilter = buildControlEventsFilterForDataShard(expCtx);
!controlEventsFilter.isEmpty()) {
unwindFilter = std::make_unique<OrMatchExpression>(std::move(unwindFilter), nullptr);
unwindFilter->add(MatchExpressionParser::parseAndNormalize(
bsonObj.emplace_back(controlEventsFilter), expCtx));
}
}
return optimizeMatchExpression(std::move(unwindFilter));
}
} // namespace change_stream_filter
@ -111,7 +125,7 @@ std::unique_ptr<MatchExpression> buildUnwindTransactionFilter(
boost::intrusive_ptr<DocumentSourceChangeStreamUnwindTransaction>
DocumentSourceChangeStreamUnwindTransaction::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
std::vector<BSONObj> bsonObj = std::vector<BSONObj>{};
std::vector<BSONObj> bsonObj;
std::unique_ptr<MatchExpression> matchExpr =
change_stream_filter::buildUnwindTransactionFilter(expCtx, nullptr, bsonObj);
return new DocumentSourceChangeStreamUnwindTransaction(matchExpr->serialize(), expCtx);

View File

@ -69,6 +69,9 @@ class DocumentSourceChangeStreamUnwindTransaction final
public:
static constexpr StringData kStageName = "$_internalChangeStreamUnwindTransaction"_sd;
DocumentSourceChangeStreamUnwindTransaction(
BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx);
static boost::intrusive_ptr<DocumentSourceChangeStreamUnwindTransaction> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
@ -93,6 +96,10 @@ public:
return DocumentSourceChangeStreamUnwindTransaction::kStageName.data();
}
MatchExpression* getMatchExpression() const {
return _expression.get();
}
static const Id& id;
Id getId() const override {
@ -107,9 +114,6 @@ private:
DocumentSourceContainer::iterator doOptimizeAt(DocumentSourceContainer::iterator itr,
DocumentSourceContainer* container) final;
DocumentSourceChangeStreamUnwindTransaction(
BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
* Resets the transaction entry filter saved in the '_filter' and '_expression' fields.
*/

View File

@ -35,9 +35,9 @@
#include "mongo/db/exec/agg/pipeline_builder.h"
#include "mongo/db/exec/document_value/document_metadata_fields.h"
#include "mongo/db/exec/document_value/value_comparator.h"
#include "mongo/db/pipeline/change_stream_helpers.h"
#include "mongo/db/pipeline/change_stream_start_after_invalidate_info.h"
#include "mongo/db/pipeline/change_stream_topology_change_info.h"
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/pipeline/plan_explainer_pipeline.h"
#include "mongo/db/pipeline/resume_token.h"
@ -59,16 +59,6 @@ namespace mongo {
namespace {
auto& changeStreamsLargeEventsFailedCounter =
*MetricBuilder<Counter64>{"changeStreams.largeEventsFailed"};
// Tests if we are currently running in a router or in a replica set context.
bool isRouterOrReplicaSet(ExpressionContext* expCtx) {
if (expCtx->getInRouter()) {
return true;
}
auto* replCoord = repl::ReplicationCoordinator::get(expCtx->getOperationContext());
return replCoord && replCoord->getSettings().isReplSet();
}
} // namespace
PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx,
@ -141,9 +131,10 @@ boost::optional<Document> PlanExecutorPipeline::_getNext() {
// No change stream control events should ever escape an aggregation pipeline on the router
// or the replica set.
tassert(10358906,
"No control events should escape this aggregation pipeline",
"No control events should escape this aggregation pipeline on a router or "
"non-sharded replica set",
!nextDoc->metadata().isChangeStreamControlEvent() ||
!isRouterOrReplicaSet(_expCtx.get()));
!change_stream::isRouterOrNonShardedReplicaSet(_expCtx));
} else {
_pipelineIsEof = true;
}

View File

@ -19,7 +19,6 @@ mongo_cc_library(
"collection_change_stream_db_absent_state_event_handler.cpp",
"collection_change_stream_db_present_state_event_handler.cpp",
"collection_change_stream_shard_targeter_impl.cpp",
"control_events.cpp",
"data_to_shards_allocation_query_service_impl.cpp",
"database_change_stream_shard_targeter_impl.cpp",
"historical_placement_fetcher_impl.cpp",
@ -33,12 +32,12 @@ mongo_cc_library(
"collection_change_stream_db_absent_state_event_handler.h",
"collection_change_stream_db_present_state_event_handler.h",
"collection_change_stream_shard_targeter_impl.h",
"control_events.h",
"data_to_shards_allocation_query_service_impl.h",
"database_change_stream_shard_targeter_impl.h",
"historical_placement_fetcher_impl.h",
],
deps = [
":change_streams_control_events",
"//src/mongo/db:namespace_spec",
"//src/mongo/db:server_base",
"//src/mongo/db/exec/document_value",
@ -46,6 +45,21 @@ mongo_cc_library(
],
)
mongo_cc_library(
name = "change_streams_control_events",
srcs = [
"control_events.cpp",
],
hdrs = [
"control_events.h",
],
deps = [
"//src/mongo/db:namespace_spec",
"//src/mongo/db:server_base",
"//src/mongo/db/exec/document_value",
],
)
mongo_cc_unit_test(
name = "change_streams_test",
srcs = [

View File

@ -33,6 +33,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/service_context.h"
#include "mongo/s/change_streams/all_databases_change_stream_shard_targeter_impl.h"
#include "mongo/s/change_streams/collection_change_stream_shard_targeter_impl.h"
#include "mongo/s/change_streams/control_events.h"

View File

@ -30,6 +30,9 @@
#include "mongo/s/change_streams/data_to_shards_allocation_query_service_impl.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/service_context.h"
#include "mongo/s/change_streams/data_to_shards_allocation_query_service_impl.h"
#include "mongo/s/change_streams/historical_placement_fetcher_impl.h"
#include "mongo/util/assert_util.h"
namespace mongo {
@ -46,5 +49,22 @@ AllocationToShardsStatus DataToShardsAllocationQueryServiceImpl::getAllocationTo
}
MONGO_UNREACHABLE_TASSERT(10718905);
} // namespace mongo
}
namespace {
ServiceContext::ConstructorActionRegisterer dataToShardsAllocationQueryServiceRegisterer(
"DataToShardsAllocationQueryServiceRegisterer",
{},
[](ServiceContext* serviceContext) {
invariant(serviceContext);
auto fetcher = std::make_unique<HistoricalPlacementFetcherImpl>();
DataToShardsAllocationQueryService::set(
serviceContext,
std::make_unique<DataToShardsAllocationQueryServiceImpl>(std::move(fetcher)));
},
{});
} // namespace
} // namespace mongo

View File

@ -38,7 +38,12 @@ namespace mongo {
HistoricalPlacement HistoricalPlacementFetcherImpl::fetch(
OperationContext* opCtx, const boost::optional<NamespaceString>& nss, Timestamp atClusterTime) {
ConfigsvrGetHistoricalPlacement request(nss.value_or(NamespaceString::kEmpty), atClusterTime);
// The config server request must always have a namespace string, even if it is the empty
// string.
const auto targetWholeCluster = !nss.has_value() || nss->isEmpty();
ConfigsvrGetHistoricalPlacement request(
targetWholeCluster ? nss.value() : NamespaceString::kEmpty, atClusterTime);
request.setTargetWholeCluster(targetWholeCluster);
auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto remoteResponse = uassertStatusOK(

View File

@ -27,11 +27,11 @@
* it in the license file.
*/
#include "src/mongo/s/query/exec/merge_cursors_stage.h"
#include "mongo/s/query/exec/merge_cursors_stage.h"
#include "mongo/db/exec/agg/document_source_to_stage_registry.h"
#include "src/mongo/s/query/exec/document_source_merge_cursors.h"
#include "mongo/platform/compiler.h"
#include "mongo/s/query/exec/document_source_merge_cursors.h"
namespace mongo::exec::agg {
@ -88,7 +88,13 @@ GetNextResult MergeCursorsStage::doGetNext() {
if (next.isEOF()) {
return GetNextResult::makeEOF();
}
return Document::fromBsonWithMetaData(*next.getResult());
Document doc = Document::fromBsonWithMetaData(*next.getResult());
if (MONGO_unlikely(doc.metadata().isChangeStreamControlEvent())) {
// Special handling needed for control events here to avoid an assertion failure in the
// 'GetNextResult' constructor.
return GetNextResult::makeAdvancedControlDocument(std::move(doc));
}
return GetNextResult(std::move(doc));
}
void MergeCursorsStage::doDispose() {