mongo/jstests/change_streams/ddl_create_index_txn.js

139 lines
4.5 KiB
JavaScript

/**
* Tests that a change stream will correctly unwind createIndexes operations from applyOps when
* createIndexes is done in a transaction.
*
* @tags: [
* uses_transactions,
* requires_majority_read_concern,
* requires_fcv_60,
* # In order to run this test with sharding we would have to create a transaction that creates
* # the collection, shards it, and then creates the index. however sharding a collection in a
* # transaction is not allowed and creating an index in a transaction on a collection that was
* # not created in that transaction is also not allowed. so this test only works with unsharded
* # collections.
* assumes_unsharded_collection
* ]
*/
import {withTxnAndAutoRetryOnMongos} from "jstests/libs/auto_retry_transaction_in_sharding.js";
import {assertDropCollection} from "jstests/libs/collection_drop_recreate.js";
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
const dbName = jsTestName() + "_db0";
const collName = jsTestName() + "_1";
const otherCollName = jsTestName() + "_2";
const otherDBName = jsTestName() + "_3";
const otherDBCollName = "someColl";
const session = db.getMongo().startSession();
const sessionDB = session.getDatabase(dbName);
const sessionOtherDB = session.getDatabase(otherDBName);
const sessionColl = sessionDB[collName];
const sessionOtherColl = sessionDB[otherCollName];
const sessionOtherDBColl = sessionOtherDB[otherDBCollName];
assertDropCollection(sessionDB, collName);
assertDropCollection(sessionDB, otherCollName);
assertDropCollection(sessionOtherDB, otherDBCollName);
let csOptions = {showExpandedEvents: true};
const pipeline = [
{$changeStream: csOptions},
{$project: {"lsid.uid": 0}},
{$match: {operationType: {$ne: "endOfTransaction"}}},
];
let cst = new ChangeStreamTest(sessionDB);
let changeStream = cst.startWatchingChanges({pipeline, collection: collName});
const testStartTime = changeStream.postBatchResumeToken;
assert.neq(testStartTime, undefined);
const txnOptions = {
readConcern: {level: "local"},
writeConcern: {w: "majority"},
};
withTxnAndAutoRetryOnMongos(
session,
() => {
assert.commandWorked(sessionColl.createIndex({unused: 1}));
},
txnOptions,
);
const lsid = session.getSessionId();
const txnNumberColl = session.getTxnNumber_forTesting();
withTxnAndAutoRetryOnMongos(
session,
() => {
assert.commandWorked(sessionOtherColl.createIndex({unused: 1}));
},
txnOptions,
);
const txnNumberOtherColl = session.getTxnNumber_forTesting();
withTxnAndAutoRetryOnMongos(
session,
() => {
assert.commandWorked(sessionOtherDBColl.createIndex({unused: 1}));
},
txnOptions,
);
const txnNumberOtherDBColl = session.getTxnNumber_forTesting();
const expectedChanges = [
{operationType: "create", ns: {db: dbName, coll: collName}},
{
operationType: "createIndexes",
ns: {db: dbName, coll: collName},
"operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
lsid,
txnNumber: txnNumberColl,
},
];
// Test single coll changeStream.
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges});
// Test whole db changeStream.
const otherCollEvents = [
{operationType: "create", ns: {db: dbName, coll: otherCollName}},
{
operationType: "createIndexes",
ns: {db: dbName, coll: otherCollName},
"operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
lsid,
txnNumber: txnNumberOtherColl,
},
];
expectedChanges.push(...otherCollEvents);
csOptions.startAfter = testStartTime;
changeStream = cst.startWatchingChanges({pipeline, collection: 1});
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
cst.cleanUp();
cst = new ChangeStreamTest(db.getSiblingDB("admin"));
// Test whole cluster changeStream.
const otherDBEvents = [
{operationType: "create", ns: {db: otherDBName, coll: otherDBCollName}},
{
operationType: "createIndexes",
ns: {db: otherDBName, coll: otherDBCollName},
"operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
lsid,
txnNumber: txnNumberOtherDBColl,
},
];
expectedChanges.push(...otherDBEvents);
csOptions.allChangesForCluster = true;
changeStream = cst.startWatchingChanges({pipeline, collection: 1});
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
cst.cleanUp();