mongo/jstests/libs/query/change_stream_rewrite_util.js

308 lines
13 KiB
JavaScript

/**
* Helper functions that are used in change streams rewrite test cases.
*/
import {
assertCreateCollection,
assertDropAndRecreateCollection,
assertDropCollection,
} from "jstests/libs/collection_drop_recreate.js";
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
import {runWithRetries} from "jstests/libs/run_with_retries.js";
const isResumableError = (e) => {
return (
ErrorCodes.isNetworkError(e.code) ||
ErrorCodes.isExceededTimeLimitError(e.code) ||
ErrorCodes.isStaleShardVersionError(e.code) ||
ErrorCodes.isSnapshotError(e.code)
);
};
// Function which generates a write workload on the specified collection, including all events that
// a change stream may consume. Assumes that the specified collection does not already exist.
export function generateChangeStreamWriteWorkload(db, collName, numDocs, includInvalidatingEvents = true) {
// If this is a sharded passthrough, make sure we shard on something other than _id so that a
// non-id field appears in the documentKey. This will generate 'create' and 'shardCollection'.
if (FixtureHelpers.isMongos(db)) {
assert.commandWorked(db.adminCommand({enableSharding: db.getName()}));
assert.commandWorked(
db.adminCommand({shardCollection: `${db.getName()}.${collName}`, key: {shardKey: "hashed"}}),
);
}
// If the collection hasn't already been created, do so here.
let testColl = assertCreateCollection(db, collName);
// Build an index, collMod it, then drop it.
assert.commandWorked(testColl.createIndex({a: 1}));
assert.commandWorked(
db.runCommand({
collMod: testColl.getName(),
index: {keyPattern: {a: 1}, hidden: true, expireAfterSeconds: 500},
}),
);
assert.commandWorked(testColl.dropIndex({a: 1}));
// Modify the collection's validation options.
assert.commandWorked(
testColl.runCommand({
collMod: collName,
validator: {},
validationLevel: "off",
validationAction: "warn",
}),
);
// Change the validation options back.
assert.commandWorked(
testColl.runCommand({
collMod: collName,
validator: {},
validationLevel: "strict",
validationAction: "error",
}),
);
// Insert some documents.
for (let i = 0; i < numDocs; ++i) {
assert.commandWorked(
testColl.insert({_id: i, shardKey: i, a: [1, [2], {b: 3}], f1: {subField: true}, f2: false}),
);
}
// Update half of them. We generate these updates individually so that they generate different
// values for the 'updatedFields', 'removedFields' and 'truncatedArrays' subfields.
const updateSpecs = [
[{$set: {f2: true}}], // only populates 'updatedFields'
[{$unset: ["f1"]}], // only populates 'removedFields'
[{$set: {a: [1, [2]]}}], // only populates 'truncatedArrays'
[{$set: {a: [1, [2]], f2: true}}, {$unset: ["f1"]}], // populates all fields
];
for (let i = 0; i < numDocs / 2; ++i) {
assert.commandWorked(testColl.update({_id: i, shardKey: i}, updateSpecs[i % updateSpecs.length]));
}
// Replace the other half.
for (let i = numDocs / 2; i < numDocs; ++i) {
assert.commandWorked(testColl.replaceOne({_id: i, shardKey: i}, {_id: i, shardKey: i}));
}
// Delete half of the updated documents.
for (let i = 0; i < numDocs / 4; ++i) {
assert.commandWorked(testColl.remove({_id: i, shardKey: i}));
}
// Create, modify, and drop a view on the collection.
assert.commandWorked(db.createView("view", collName, []));
assert.commandWorked(db.runCommand({collMod: "view", viewOn: "viewOnView", pipeline: []}));
assertDropCollection(db, "view");
// If the caller is prepared to handle potential invalidations, include the following events.
if (includInvalidatingEvents) {
// Rename the collection.
const collNameAfterRename = `${testColl.getName()}_renamed`;
assert.commandWorked(testColl.renameCollection(collNameAfterRename));
testColl = db[collNameAfterRename];
// Rename it back.
assert.commandWorked(testColl.renameCollection(collName));
testColl = db[collName];
// Drop the collection.
assert(testColl.drop());
// Drop the database.
assert.commandWorked(db.dropDatabase());
}
return testColl;
}
// Helper function to fully exhaust a change stream from the specified point and return all events.
// Assumes that all relevant events can fit into a single 16MB batch.
export function getAllChangeStreamEvents(db, extraPipelineStages = [], csOptions = {}, resumeToken) {
// Open a whole-cluster stream based on the supplied arguments.
const csCursor = db
.getMongo()
.watch(extraPipelineStages, Object.assign({resumeAfter: resumeToken, maxAwaitTimeMS: 1}, csOptions));
// Run getMore until the post-batch resume token advances. In a sharded passthrough, this will
// guarantee that all shards have returned results, and we expect all results to fit into a
// single batch, so we know we have exhausted the stream.
while (bsonWoCompare(csCursor._postBatchResumeToken, resumeToken) == 0) {
csCursor.hasNext(); // runs a getMore
}
// Close the cursor since we have already retrieved all results.
csCursor.close();
// Extract all events from the streams. Since the cursor is closed, it will not attempt to
// retrieve any more batches from the server.
return csCursor.toArray();
}
// Helper function to check whether this value is a plain old javascript object.
export function isPlainObject(value) {
return value && typeof value == "object" && value.constructor === Object;
}
// Verifies the number of change streams events returned from a particular shard.
export function assertNumChangeStreamDocsReturnedFromShard(stats, shardName, expectedTotalReturned) {
assert(stats.shards.hasOwnProperty(shardName), stats);
const stages = stats.shards[shardName].stages;
const lastStage = stages[stages.length - 1];
assert.eq(lastStage.nReturned, expectedTotalReturned, stages);
}
export function getExecutionStatsForShard(stats, shardName) {
assert(stats.shards.hasOwnProperty(shardName), stats);
assert.eq(Object.keys(stats.shards[shardName].stages[0])[0], "$cursor", stats);
return stats.shards[shardName].stages[0].$cursor.executionStats;
}
export function getUnwindStageForShard(stats, shardName) {
assert(stats.shards.hasOwnProperty(shardName), stats);
const stages = stats.shards[shardName].stages;
assert.eq(stages[1].$changeStream.stage, "internalUnwindTransaction");
return stages[1];
}
// Verifies the number of oplog events read by a particular shard.
export function assertNumMatchingOplogEventsForShard(stats, shardName, expectedTotalReturned) {
const executionStats = getExecutionStatsForShard(stats, shardName);
// We verify the number of documents from the unwind stage instead of the oplog cursor, so we
// are testing that the filter is applied to the output of batched oplog entries as well.
const unwindStage = getUnwindStageForShard(stats, shardName);
assert.eq(
unwindStage.nReturned,
expectedTotalReturned,
() =>
`Expected ${expectedTotalReturned} events on shard ${shardName} but got ` +
`${executionStats.nReturned}. Execution stats:\n${tojson(executionStats)}\n` +
`Unwind stage:\n${tojson(unwindStage)}`,
);
}
// Returns a newly created sharded collection sharded by caller provided shard key.
export function createShardedCollection(shardingTest, shardKey, dbName, collName, splitAt) {
assert.commandWorked(shardingTest.s.adminCommand({enableSharding: dbName, primaryShard: shardingTest.shard0.name}));
const db = shardingTest.s.getDB(dbName);
assertDropAndRecreateCollection(db, collName);
const coll = db.getCollection(collName);
assert.commandWorked(coll.createIndex({[shardKey]: 1}));
// Shard the test collection and split it into two chunks: one that contains all {shardKey: <lt
// splitAt>} documents and one that contains all {shardKey: <gte splitAt>} documents.
shardingTest.shardColl(
collName,
{[shardKey]: 1} /* shard key */,
{[shardKey]: splitAt} /* split at */,
{[shardKey]: splitAt} /* move the chunk containing {shardKey: splitAt} to its own shard */,
dbName,
true,
);
return coll;
}
// A helper that opens a change stream on the whole cluster with the user supplied match expression
// 'userMatchExpr' and 'changeStreamSpec'. The helper validates that:
// 1. for each shard, the events are seen in the order specified by 'expectedResult' which structure
// is { collOrDbName : { operationType : [eventIdentifier1, eventIdentifier2, ..], ... }, .. }.
// 2. There are no additional events being returned other than the ones in the 'expectedResult'.
// 3. the filtering is been done at oplog level, and each of the shard read only the
// 'expectedOplogNReturnedPerShard' documents.
export function verifyChangeStreamOnWholeCluster({
st,
changeStreamSpec,
userMatchExpr,
expectedResult,
expectedOplogNReturnedPerShard,
expectedChangeStreamDocsReturnedPerShard,
}) {
changeStreamSpec["allChangesForCluster"] = true;
const adminDB = st.s.getDB("admin");
const cursor = adminDB.aggregate([{$changeStream: changeStreamSpec}, userMatchExpr]);
for (const [collOrDb, opDict] of Object.entries(expectedResult)) {
for (const [op, eventIdentifierList] of Object.entries(opDict)) {
eventIdentifierList.forEach((eventIdentifier) => {
assert.soon(() => cursor.hasNext(), {op: op, eventIdentifier: eventIdentifier});
const event = cursor.next();
assert.eq(
event.operationType,
op,
() => `Expected "${op}" but got "${event.operationType}". Full event: ` + `${tojson(event)}`,
);
if (op == "dropDatabase") {
assert.eq(event.ns.db, eventIdentifier, event);
} else if (op == "insert" || op == "update" || op == "replace" || op == "delete") {
assert.eq(event.documentKey._id, eventIdentifier, event);
} else if (op == "rename") {
assert.eq(event.to.coll, eventIdentifier, event);
} else if (op == "drop") {
assert.eq(event.ns.coll, eventIdentifier);
} else if (op == "create") {
assert.eq(event.ns.coll, eventIdentifier);
} else if (op == "createIndexes") {
assert.eq(event.ns.coll, eventIdentifier);
} else if (op == "dropIndexes") {
assert.eq(event.ns.coll, eventIdentifier);
} else if (op == "shardCollection") {
assert.eq(event.ns.coll, eventIdentifier);
} else if (op == "modify") {
assert.eq(event.ns.coll, eventIdentifier);
} else {
assert(false, event);
}
if (op != "dropDatabase") {
assert.eq(event.ns.coll, collOrDb);
}
});
}
}
assert(!cursor.hasNext(), () => tojson(cursor.next()));
// BF-37671 has revealed that the following command often failed with transient errors, so we
// wrap it a retry loop with backoff.
const stats = runWithRetries(() => {
return assert.commandWorked(
adminDB.runCommand({
explain: {
aggregate: 1,
pipeline: [{$changeStream: changeStreamSpec}, userMatchExpr],
cursor: {batchSize: 0},
},
verbosity: "executionStats",
}),
);
}, isResumableError);
assert.commandWorked(stats);
assert(
stats.hasOwnProperty("shards"),
() => `expecting stats to have 'shards' attribute, but got ${tojson(stats)}`,
);
assertNumMatchingOplogEventsForShard(stats, st.shard0.shardName, expectedOplogNReturnedPerShard[0]);
assertNumMatchingOplogEventsForShard(stats, st.shard1.shardName, expectedOplogNReturnedPerShard[1]);
if (expectedChangeStreamDocsReturnedPerShard !== undefined) {
assertNumChangeStreamDocsReturnedFromShard(
stats,
st.shard0.shardName,
expectedChangeStreamDocsReturnedPerShard[0],
);
assertNumChangeStreamDocsReturnedFromShard(
stats,
st.shard1.shardName,
expectedChangeStreamDocsReturnedPerShard[1],
);
}
}