mongo/jstests/serverless/write_to_change_collection.js

196 lines
8.3 KiB
JavaScript

// Tests that entries are written to the change collection for collection create, drop and document
// modification operations.
// @tags: [
// requires_fcv_62,
// ]
import {funWithArgs} from "jstests/libs/parallel_shell_helpers.js";
import {
ChangeStreamMultitenantReplicaSetTest,
verifyChangeCollectionEntries
} from "jstests/serverless/libs/change_collection_util.js";
const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 2});
const primary = replSetTest.getPrimary();
const secondary = replSetTest.getSecondary();
// Hard code tenants ids such that a particular tenant can be identified deterministically.
const firstTenantId = ObjectId("6303b6bb84305d2266d0b779");
const secondTenantId = ObjectId("7303b6bb84305d2266d0b779");
// Create tokens for each tenant
const firstToken = _createTenantToken({tenant: firstTenantId});
const secondToken = _createTenantToken({tenant: secondTenantId});
// Connections to the replica set primary that are stamped with their respective tenant ids.
const firstTenantConn =
ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, firstTenantId);
const secondTenantConn =
ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, secondTenantId);
// Enable the change stream state such that change collections are created for both tenants.
replSetTest.setChangeStreamState(firstTenantConn, true);
replSetTest.setChangeStreamState(secondTenantConn, true);
// Performs writes on the specified collection 'coll' such that the corresponding oplog entries are
// captured by the tenant's change collection.
function performWrites(coll, docIds) {
docIds.forEach(docId => assert.commandWorked(coll.insert({_id: docId})));
docIds.forEach(
docId => assert.commandWorked(coll.update({_id: docId}, {$set: {annotate: "updated"}})));
}
// Retrieve the last timestamp from the oplog.
function getLatestTimestamp() {
const oplogColl = primary.getDB("local").oplog.rs;
const oplogTimestamp = oplogColl.find().sort({ts: -1}).limit(1).next().ts;
assert(oplogTimestamp !== undefined);
return oplogTimestamp;
}
// Clear token on primary and secondary connections
function clearTokens() {
primary._setSecurityToken(undefined);
secondary._setSecurityToken(undefined);
}
// Test that writes to two different change collections are isolated and that each change collection
// captures only the relevant oplog entries associated with the corresponding tenant.
(function testWritesWithMultipleTenants() {
jsTestLog("Testing writes on change collections with multiple tenants.");
// A helper shell function to perform write for the specified 'tenantId'.
async function shellFn(hostAddr, collName, tenantId, performWrites) {
const {ChangeStreamMultitenantReplicaSetTest} =
await import("jstests/serverless/libs/change_collection_util.js");
const tenantConn =
ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId);
const docIds = Array.from({length: 300}, (_, index) => index);
performWrites(tenantConn.getDB("test").getCollection(collName), docIds);
assert(tenantConn.getDB("test").getCollection(collName).drop());
}
const startOplogTimestamp = getLatestTimestamp();
// Perform writes for the first tenant in a different shell.
const firstTenantShellReturn =
startParallelShell(funWithArgs(shellFn,
primary.host,
"testWritesWithMultipleTenants_firstTenant",
firstTenantId,
performWrites),
primary.port);
// Perform writes to the second tenant parallely with the first tenant.
const secondTenantShellReturn =
startParallelShell(funWithArgs(shellFn,
primary.host,
"testWritesWithMultipleTenants_secondTenant",
secondTenantId,
performWrites),
primary.port);
// Wait for both shells to return.
firstTenantShellReturn();
secondTenantShellReturn();
const endOplogTimestamp = getLatestTimestamp();
assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0);
// Verify that both change collections captured their respective tenant's oplog entries in
// the primary.
verifyChangeCollectionEntries(
primary, startOplogTimestamp, endOplogTimestamp, firstTenantId, firstToken);
verifyChangeCollectionEntries(
primary, startOplogTimestamp, endOplogTimestamp, secondTenantId, secondToken);
clearTokens();
// Wait for the replication to finish.
replSetTest.awaitReplication();
// Verify that both change collections captured their respective tenant's oplog entries in
// the secondary.
verifyChangeCollectionEntries(
secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId, firstToken);
verifyChangeCollectionEntries(
secondary, startOplogTimestamp, endOplogTimestamp, secondTenantId, secondToken);
clearTokens();
})();
// Test that transactional writes to two different change collections are isolated and that each
// change collection captures only the relevant 'applyOps' oplog entries associated with the
// corresponding tenant.
(function testTransactionalWritesWithMultipleTenants() {
jsTestLog("Testing transactional writes on change collections with multiple tenants.");
// A helper shell function to perform transactional write for the specified 'tenantId'.
async function shellFn(hostAddr, collName, tenantId, performWrites) {
const {ChangeStreamMultitenantReplicaSetTest} =
await import("jstests/serverless/libs/change_collection_util.js");
const tenantConn =
ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId);
const session = tenantConn.getDB("test").getMongo().startSession();
const sessionDb = session.getDatabase("test");
session.startTransaction();
const docIds = Array.from({length: 300}, (_, index) => index);
performWrites(sessionDb.getCollection(collName), docIds);
session.commitTransaction_forTesting();
}
const startOplogTimestamp = getLatestTimestamp();
// Perform writes within a transaction for the first tenant.
const firstTenantShellReturn =
startParallelShell(funWithArgs(shellFn,
primary.host,
"testTransactionalWritesWithMultipleTenants_firstTenant",
firstTenantId,
performWrites),
primary.port);
// Perform parallel writes within a transaction for the second tenant.
const secondTenantShellReturn =
startParallelShell(funWithArgs(shellFn,
primary.host,
"testTransactionalWritesWithMultipleTenants_secondTenant",
secondTenantId,
performWrites),
primary.port);
// Wait for shells to return.
firstTenantShellReturn();
secondTenantShellReturn();
const endOplogTimestamp = getLatestTimestamp();
assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0);
// Verify that both change collections captured their respective tenant's 'applyOps' oplog
// entries in the primary.
verifyChangeCollectionEntries(
primary, startOplogTimestamp, endOplogTimestamp, firstTenantId, firstToken);
verifyChangeCollectionEntries(
primary, startOplogTimestamp, endOplogTimestamp, secondTenantId, secondToken);
clearTokens();
// Wait for the replication to finish.
replSetTest.awaitReplication();
// Verify that both change collections captured their respective tenant's 'applyOps' oplog
// entries in the secondary.
verifyChangeCollectionEntries(
secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId, firstToken);
verifyChangeCollectionEntries(
secondary, startOplogTimestamp, endOplogTimestamp, secondTenantId, secondToken);
clearTokens();
})();
replSetTest.stopSet();