SERVER-85453: Make ExternalDataSourceScopeGuard only compatible with one plan executor

GitOrigin-RevId: 76ffc31436ea6dc3ed16a504621c59434548106c
This commit is contained in:
Will Buerger 2024-02-06 18:05:10 +00:00 committed by MongoDB Bot
parent f7d530b603
commit ca28a130ba
7 changed files with 195 additions and 61 deletions

View File

@ -21,13 +21,11 @@ function.
![fsm.png](../images/testing/fsm.png)
The runner provides three modes of execution for workloads: serial, parallel,
and composed. Serial mode runs the provided workloads one after the other,
The runner provides two modes of execution for workloads: serial and parallel.
Serial mode runs the provided workloads one after the other,
waiting for all threads of a workload to complete before moving on to the next
workload. Parallel mode runs subsets of the provided workloads in separate
threads simultaneously. Composed mode runs subsets of the provided workloads in
the same threads simultaneously, with each thread moving between the states of
multiple workloads.
threads simultaneously.
New methods were added to allow for finer-grained assertions under different
situations. For example, a test that inserts a document into a collection, and
@ -103,9 +101,8 @@ It's best to also declare states within its own closure so as not to interfere
with the scope of $config. Each state takes two arguments, the db object and the
collection name. For later, note that this db and collection are the only one
that you can be guaranteed to "own" when asserting. Try to make each state a
discrete unit of work that can stand alone without the other states (in fact, an
FSM that requires another state to run before it will probably not work in
Composed mode). Additionally, try to define each function that makes up a state
discrete unit of work that can stand alone without the other states.
Additionally, try to define each function that makes up a state
with a name as opposed to anonymously - this makes easier to read backtraces
when things go wrong.
@ -362,11 +359,9 @@ runWorkloads functions, the third argument, can contain the following options
(some depend on the run mode):
* `numSubsets` - Not available in serial mode, determines how many subsets of
workloads to execute in parallel or composed mode
workloads to execute in parallel mode
* `subsetSize` - Not available in serial mode, determines how large each subset of
workloads executed is
* `iterations` - Only available in composed mode, determines how many transitions
to perform between states in a single thread of composition.
#### fsm_all.js
@ -385,8 +380,8 @@ settings in `$config`.
#### fsm_all_replication.js
Sets up a replica set (with 3 mongods by default) and runs workloads serially,
in parallel, or in composed mode. For example,
Sets up a replica set (with 3 mongods by default) and runs workloads serially or
in parallel. For example,
`runWorkloadsSerially([<workload1>, <workload2>, ...], { replication: true } )`
@ -396,7 +391,7 @@ primary.
#### fsm_all_sharded.js
Sets up a sharded cluster (with 2 shards and 1 mongos by default) and runs
workloads serially, in parallel, or in composed mode. For example,
workloads serially or in parallel. For example,
`runWorkloadsInParallel([<workload1>, <workload2>, ...], { sharded: true } )`
@ -405,8 +400,7 @@ creates a sharded cluster and runs workloads in parallel.
#### fsm_all_sharded_replication.js
Sets up a sharded cluster (with 2 shards, each having 3 replica set members, and
1 mongos by default) and runs workloads serially, in parallel, or in composed
mode.
1 mongos by default) and runs workloads serially or in parallel.
### Excluding a workload

View File

@ -0,0 +1,122 @@
/**
* Runs stress tests of exchange producers, where multiple threads run concurrent getNexts on the
* consumer cursors.
*
* @tags: [assumes_against_mongod_not_mongos]
*/
// The buffer needs to be big enough that no one consumer buffer gets filled near the final
// iteration, or it may hang if the final getMore is blocked waiting on a different consumer to make
// space in its buffer.
export const $config = (function() {
var data = {
numDocs: 10000,
numConsumers: 5,
bufferSize: 100 * 1024,
batchSize: 5,
};
// Runs a getMore on the cursor with id at cursorIndex. We have enough documents and few enough
// iterations that the cursors should never be exhausted.
function runGetMoreOnCursor(db, collName, cursorIndex, batchSize, cursorIds, sessionId) {
// See comment at the end of setup() for why we need eval().
const cursorId = eval(cursorIds[cursorIndex]);
const res = db.runCommand(
{getMore: cursorId, collection: collName, batchSize, lsid: {id: eval(sessionId)}});
// If the getMore was successful, assert we have enough results returned; otherwise, it
// should have because another worker thread has that cursor in use.
if (res.ok) {
// All batches before the final batch must have batchSize results. The final batch will
// have 0 results.
assert.eq(batchSize, res.cursor.nextBatch.length);
} else {
assert.commandFailedWithCode(res, ErrorCodes.CursorInUse);
}
}
// One state per consumer consumer, with equal probability so we exhaust each cursor in
// approximately the same timeline. See runGetMoreOnCursor for details.
var states = function() {
function makeConsumerCallback(consumerId) {
return function consumerCallback(db, collName) {
return runGetMoreOnCursor(
db, collName, consumerId, this.batchSize, this.cursorIds, this.sessionId);
}
}
return {
// A no-op starting state so the worker threads don't all start on the same cursors.
init: function init(db, collName) {}, consumer0: makeConsumerCallback(0),
consumer1: makeConsumerCallback(1), consumer2: makeConsumerCallback(2),
consumer3: makeConsumerCallback(3), consumer4: makeConsumerCallback(4),
}
}();
var allStatesEqual =
{init: 0, consumer0: 0.2, consumer1: 0.2, consumer2: 0.2, consumer3: 0.2, consumer4: 0.2};
var transitions = {
init: allStatesEqual,
consumer0: allStatesEqual,
consumer1: allStatesEqual,
consumer2: allStatesEqual,
consumer3: allStatesEqual,
consumer4: allStatesEqual,
};
function setup(db, collName, cluster) {
// Start a session so we can pass the sessionId from when we retrieved the cursors to the
// getMores where we want to iterate the cursors.
const session = db.getMongo().startSession();
// Load data.
const bulk = db[collName].initializeUnorderedBulkOp();
for (let i = 0; i < this.numDocs; ++i) {
bulk.insert({_id: i, a: i % 5, b: "foo"});
}
let res = bulk.execute();
assert.commandWorked(res);
assert.eq(this.numDocs, res.nInserted);
assert.eq(this.numDocs, db[collName].find().itcount());
// Run an exchange to get a list of cursors.
res = assert.commandWorked(session.getDatabase(db.getName()).runCommand({
aggregate: collName,
pipeline: [],
exchange: {
policy: "roundrobin",
consumers: NumberInt(this.numConsumers),
bufferSize: NumberInt(this.bufferSize)
},
cursor: {batchSize: 0},
}));
// Save the cursor ids to $config.data so each of the worker threads has access to the
// cursors, as well as the sessionId.
assert.eq(this.numConsumers, res.cursors.length);
this.sessionId = tojson(session.getSessionId()["id"]);
this.cursorIds = [];
for (const cursor of res.cursors) {
// We have to stringify the cursorId with tojson() since serializing data between
// threads in the mongo shell doesn't work. When we use it later, we rehydrate it with
// eval().
this.cursorIds.push(tojson(cursor.cursor.id));
}
}
function teardown(db, collName) {
// Kill all the open cursors.
let cursors = [];
for (const cursorId of this.cursorIds) {
cursors.push(eval(cursorId));
}
assert.commandWorked(db.runCommand({killCursors: collName, cursors}));
}
// threadCount must be equal to numConsumers. We need as many worker threads as consumers to
// avoid a deadlock where all threads are waiting for one particular cursor to run a getMore.
return {
threadCount: data.numConsumers, iterations: 20, startState: 'init', states: states,
transitions: transitions, setup: setup, teardown: teardown, data: data
}
})();

View File

@ -52,6 +52,11 @@ ExternalDataSourceScopeGuard::ExternalDataSourceScopeGuard(
const std::vector<std::pair<NamespaceString, std::vector<ExternalDataSourceInfo>>>&
usedExternalDataSources)
: _opCtx(opCtx) {
tassert(8545300,
"ExternalDataSourceScopeGuard should only be created when there is at least one "
"external data source.",
usedExternalDataSources.size() > 0);
// Just in case that any virtual collection could not be created, when dtor does not have a
// chance to be executed, cleans up collections that has already been created at that
// moment.

View File

@ -58,8 +58,6 @@ public:
}
}
ExternalDataSourceScopeGuard() : _opCtx(nullptr), _toBeDroppedVirtualCollections() {}
ExternalDataSourceScopeGuard(
OperationContext* opCtx,
const std::vector<std::pair<NamespaceString, std::vector<ExternalDataSourceInfo>>>&

View File

@ -48,7 +48,6 @@
#include "mongo/db/auth/authorization_checks.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/catalog/external_data_source_scope_guard.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/run_aggregate.h"
#include "mongo/db/database_name.h"
@ -269,19 +268,13 @@ public:
CommandHelpers::handleMarkKillOnClientDisconnect(
opCtx, !Pipeline::aggHasWriteStage(_request.body));
// Create virtual collections and drop them when aggregate command is done. Conceptually
// ownership of virtual collections are moved to runAggregate() function together with
// 'dropVcollGuard' so that it can clean up virtual collections when it's done with
// them. ExternalDataSourceScopeGuard will take care of the situation when any
// collection could not be created.
ExternalDataSourceScopeGuard dropVcollGuard(opCtx, _usedExternalDataSources);
uassertStatusOK(runAggregate(opCtx,
_aggregationRequest,
_liteParsedPipeline,
_request.body,
_privileges,
reply,
std::move(dropVcollGuard)));
_usedExternalDataSources));
// The aggregate command's response is unstable when 'explain' or 'exchange' fields are
// set.
@ -302,14 +295,13 @@ public:
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) override {
// See run() method for details.
ExternalDataSourceScopeGuard dropVcollGuard(opCtx, _usedExternalDataSources);
uassertStatusOK(runAggregate(opCtx,
_aggregationRequest,
_liteParsedPipeline,
_request.body,
_privileges,
result,
std::move(dropVcollGuard)));
_usedExternalDataSources));
}
void doCheckAuthorization(OperationContext* opCtx) const override {

View File

@ -227,7 +227,9 @@ ClientCursorPin registerCursor(OperationContext* opCtx,
auto pin = CursorManager::get(opCtx)->registerCursor(opCtx, std::move(cursorParams));
pin->incNBatches();
ExternalDataSourceScopeGuard::get(pin.getCursor()) = extDataSrcGuard;
if (extDataSrcGuard) {
ExternalDataSourceScopeGuard::get(pin.getCursor()) = extDataSrcGuard;
}
return pin;
}
@ -496,13 +498,20 @@ boost::optional<ClientCursorPin> executeUntilFirstBatch(
opCtx, expCtx, request, cmdObj, privileges, origNss, extDataSrcGuard, execs, result);
}
// We disallowed external data sources in queries with multiple plan executors due to a data
// race (see SERVER-85453 for more details).
tassert(8545301,
"External data sources are not currently compatible with queries that use multiple "
"plan executors.",
extDataSrcGuard == nullptr);
// If there is more than one executor, that means this query will be running on multiple
// shards via exchange and merge. Such queries always require a cursor to be registered for
// each PlanExecutor.
std::vector<ClientCursorPin> pinnedCursors;
for (auto&& exec : execs) {
auto pinnedCursor = registerCursor(
opCtx, expCtx, origNss, *cmdObj, privileges, std::move(exec), extDataSrcGuard);
auto pinnedCursor =
registerCursor(opCtx, expCtx, origNss, *cmdObj, privileges, std::move(exec), nullptr);
pinnedCursors.emplace_back(std::move(pinnedCursor));
}
handleMultipleCursorsForExchange(opCtx, origNss, pinnedCursors, request, result);
@ -952,7 +961,7 @@ Status _runAggregate(OperationContext* opCtx,
const DeferredCmd& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result,
ExternalDataSourceScopeGuard externalDataSourceGuard,
std::shared_ptr<ExternalDataSourceScopeGuard> externalDataSourceGuard,
boost::optional<const ResolvedView&> resolvedView,
boost::optional<const AggregateCommandRequest&> origRequest);
@ -1017,7 +1026,7 @@ Status runAggregateOnView(OperationContext* opCtx,
std::move(deferredCmd),
privileges,
result,
{},
nullptr,
resolvedView,
request);
} else {
@ -1055,7 +1064,7 @@ Status runAggregateOnView(OperationContext* opCtx,
std::move(deferredCmd),
privileges,
result,
{},
nullptr,
resolvedView,
request);
} else {
@ -1232,7 +1241,7 @@ Status _runAggregate(OperationContext* opCtx,
const DeferredCmd& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result,
ExternalDataSourceScopeGuard externalDataSourceGuard,
std::shared_ptr<ExternalDataSourceScopeGuard> externalDataSourceGuard,
boost::optional<const ResolvedView&> resolvedView,
boost::optional<const AggregateCommandRequest&> origRequest) {
auto origNss = origRequest.has_value() ? origRequest->getNamespace() : request.getNamespace();
@ -1283,14 +1292,6 @@ Status _runAggregate(OperationContext* opCtx,
// The UUID of the collection for the execution namespace of this aggregation.
boost::optional<UUID> uuid;
// All cursors should share the ownership to 'extDataSrcGuard' if cursor(s) are created. Once
// all cursors are destroyed later, 'extDataSrcGuard' will also be destroyed and any virtual
// collections will be dropped by the destructor of ExternalDataSourceScopeGuard. We obtain a
// reference before taking locks so that the virtual collections will be dropped after releasing
// our read locks, avoiding a lock upgrade.
std::shared_ptr<ExternalDataSourceScopeGuard> extDataSrcGuard =
std::make_shared<ExternalDataSourceScopeGuard>(std::move(externalDataSourceGuard));
// If emplaced, AutoGetCollectionForReadCommand will throw if the sharding version for this
// connection is out of date. If the namespace is a view, the lock will be released before
// re-running the expanded aggregation.
@ -1718,8 +1719,15 @@ Status _runAggregate(OperationContext* opCtx,
}
collectQueryStatsMongod(opCtx, std::move(curOp->debug().queryStatsInfo.key));
} else {
auto maybePinnedCursor = executeUntilFirstBatch(
opCtx, expCtx, request, cmdObj, privileges, origNss, extDataSrcGuard, execs, result);
auto maybePinnedCursor = executeUntilFirstBatch(opCtx,
expCtx,
request,
cmdObj,
privileges,
origNss,
externalDataSourceGuard,
execs,
result);
// For an optimized away pipeline, signal the cache that a query operation has completed.
// For normal pipelines this is done in DocumentSourceCursor.
@ -1775,25 +1783,38 @@ Status runAggregate(OperationContext* opCtx,
DeferredCmd(cmdObj),
privileges,
result,
{},
nullptr,
boost::none,
boost::none);
}
Status runAggregate(OperationContext* opCtx,
AggregateCommandRequest& request,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result,
ExternalDataSourceScopeGuard externalDataSourceGuard) {
Status runAggregate(
OperationContext* opCtx,
AggregateCommandRequest& request,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result,
const std::vector<std::pair<NamespaceString, std::vector<ExternalDataSourceInfo>>>&
usedExternalDataSources) {
// Create virtual collections and drop them when aggregate command is done.
// If a cursor is registered, the ExternalDataSourceScopeGuard will be stored in the cursor;
// when the cursor is later destroyed, the scope guard will also be destroyed, and any virtual
// collections will be dropped by the destructor of ExternalDataSourceScopeGuard.
// We create this scope guard prior to taking locks in _runAggregate so that, if no cursor is
// registered, the virtual collections will be dropped after releasing our read locks, avoiding
// a lock upgrade.
auto extDataSrcGuard = usedExternalDataSources.size() > 0
? std::make_shared<ExternalDataSourceScopeGuard>(opCtx, usedExternalDataSources)
: nullptr;
return _runAggregate(opCtx,
request,
liteParsedPipeline,
DeferredCmd(cmdObj),
privileges,
result,
std::move(externalDataSourceGuard),
extDataSrcGuard,
boost::none,
boost::none);
}

View File

@ -55,13 +55,15 @@ namespace mongo {
*
* On success, fills out 'result' with the command response.
*/
Status runAggregate(OperationContext* opCtx,
AggregateCommandRequest& request,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result,
ExternalDataSourceScopeGuard externalDataSourceGuard);
Status runAggregate(
OperationContext* opCtx,
AggregateCommandRequest& request,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result,
const std::vector<std::pair<NamespaceString, std::vector<ExternalDataSourceInfo>>>&
usedExternalDataSources = {});
/**
* Convenience version that internally constructs the LiteParsedPipeline.