mongo/jstests/sharding/resharding_cleanup_and_chan...

230 lines
8.9 KiB
JavaScript

/**
* Tests that change streams do not output an event for the _shardsvrDropCollectionIfUUIDNotMatching
* command performed by resharding cleanup.
*/
import {ShardingTest} from "jstests/libs/shardingtest.js";
function getCollectionUuid(conn, dbName, collName) {
const listCollectionRes = assert.commandWorked(
conn.getDB(dbName).runCommand({listCollections: 1, filter: {name: collName}}),
);
return listCollectionRes.cursor.firstBatch[0].info.uuid;
}
/**
* Returns events up until the given timestamp.
*/
function getEvents(cursor, minTimestamp) {
const events = [];
let lastEvent;
do {
assert.soon(() => cursor.hasNext(), "change stream timed out", 10 * 1000);
lastEvent = cursor.next();
events.push(lastEvent);
} while (timestampCmp(minTimestamp, lastEvent.clusterTime) > 0);
return events;
}
function assertReshardCollectionEvent(event, dbName, collName, collUuid) {
assert.eq(event.operationType, "reshardCollection", event);
assert.eq(event.hasOwnProperty("ns"), true, event);
assert.eq(event.ns.db, dbName, event);
assert.eq(event.ns.coll, collName, event);
assert.eq(event.collectionUUID, collUuid, event);
}
function assertInsertEvent(event, dbName, collName, collUuid, doc) {
assert.eq(event.operationType, "insert", event);
assert.eq(event.hasOwnProperty("ns"), true, event);
assert.eq(event.ns.db, dbName, event);
assert.eq(event.ns.coll, collName, event);
if (collUuid) {
assert.eq(event.collectionUUID, collUuid, event);
}
assert.eq(event.fullDocument, doc, event);
}
const rsOptions = {
setParameter: {
periodicNoopIntervalSecs: 1,
writePeriodicNoops: true,
},
};
const st = new ShardingTest({
shards: 2,
rs: rsOptions,
configOptions: rsOptions,
});
function runTest(testName, setUpFunc, runCmdFunc) {
const testDbName = "testDb-" + testName;
const testCollName = "testColl";
const testDB = st.s.getDB(testDbName);
const testColl = testDB.getCollection(testCollName);
const adminDB = st.s.getDB("admin");
setUpFunc(st, testDbName, testCollName);
const numDocs = 10;
for (let i = 0; i < numDocs / 2; i++) {
assert.commandWorked(
testColl.insert([
{_id: UUID(), x: -i, y: -i},
{_id: UUID(), x: i, y: i},
]),
);
}
const defaultOptions = {};
const expandedOptions = {showExpandedEvents: true};
const collectionChangeStreamCursorDefault = testColl.aggregate([{$changeStream: defaultOptions}]);
const collectionChangeStreamCursorExpanded = testColl.aggregate([{$changeStream: expandedOptions}]);
const databaseChangeStreamCursorDefault = testDB.aggregate([{$changeStream: defaultOptions}]);
const databaseChangeStreamCursorExpanded = testDB.aggregate([{$changeStream: expandedOptions}]);
const clusterChangeStreamCursorDefault = adminDB.aggregate([
{$changeStream: Object.assign({allChangesForCluster: true}, defaultOptions)},
]);
const clusterChangeStreamCursorExpanded = adminDB.aggregate([
{$changeStream: Object.assign({allChangesForCluster: true}, expandedOptions)},
]);
const collUuidBefore = getCollectionUuid(st.s, testDbName, testCollName);
runCmdFunc(st, testDbName, testCollName);
const collUuidAfter = getCollectionUuid(st.s, testDbName, testCollName);
const newDoc = {_id: UUID(), x: 100, y: 100};
const res = assert.commandWorked(testDB.runCommand({insert: testCollName, documents: [newDoc]}));
const insertTs = res.operationTime;
const validateEvents = (events, isExpanded) => {
if (isExpanded) {
assert.eq(events.length, 2, events);
assertReshardCollectionEvent(events[0], testDbName, testCollName, collUuidBefore);
assertInsertEvent(events[1], testDbName, testCollName, collUuidAfter, newDoc);
} else {
assert.eq(events.length, 1, events);
assertInsertEvent(events[0], testDbName, testCollName, null /* collUuid */, newDoc);
}
};
{
const events = getEvents(collectionChangeStreamCursorDefault, insertTs);
jsTest.log("The events in the collection change stream (default): " + tojson(events));
validateEvents(events, false /* isExpanded */);
}
{
const events = getEvents(collectionChangeStreamCursorExpanded, insertTs);
jsTest.log("The events in the collection change stream (expanded): " + tojson(events));
validateEvents(events, true /* isExpanded */);
}
{
const events = getEvents(databaseChangeStreamCursorDefault, insertTs);
jsTest.log("The events in the database change stream (default): " + tojson(events));
validateEvents(events, false /* isExpanded */);
}
{
const events = getEvents(databaseChangeStreamCursorExpanded, insertTs);
jsTest.log("The events in the database change stream (expanded): " + tojson(events));
validateEvents(events, true /* isExpanded */);
}
{
const events = getEvents(clusterChangeStreamCursorDefault, insertTs);
jsTest.log("The events in the cluster change stream (default): " + tojson(events));
validateEvents(events, false /* isExpanded */);
}
{
const events = getEvents(clusterChangeStreamCursorExpanded, insertTs);
jsTest.log("The events in the cluster change stream (expanded): " + tojson(events));
validateEvents(events, true /* isExpanded */);
}
}
// Set up the test collection such that there is a shard where the
// _shardsvrDropCollectionIfUUIDNotMatching command performed by resharding cleanup is not a no-op.
// That is, make the cluster have a shard (namely shard0) that satisfies the following:
// 1. The shard is not the primary shard for the collection.
// 2. Before resharding, the shard does not own any chunks for that collection but the
// collection exists locally on that shard (as an empty collection).
// 3. After resharding, the shard does not own any chunks for that collection.
// In other words, shard0 is neither a donor nor a recipient for the resharding operation.
// The collection must be sharded because:
// - Unsharded collections can only be moved using moveCollection (resharding) which always drops
// the original collection from all shards.
// - Only sharded collections can be moved incrementally using chunk migrations which do not drop
// the collection from the donor shard upon moving the last chunk.
// - To go from sharded to unsharded, a collection must go through unshardCollection (resharding)
// which again drops the original collection from all shards.
// For this reason, this test scenario is only applicable to the reshardCollection and
// unshardCollection commands.
const testCases = [];
// To achieve (1) and (2):
// - Make the test database have shard1 as the primary shard.
// - Shard the test collection and move a chunk from shard1 to shard0 and back to shard1 so the
// collection exists as an empty collection on shard0.
const setUp = (st, testDbName, testCollName) => {
const testNs = testDbName + "." + testCollName;
assert.commandWorked(st.s.adminCommand({enableSharding: testDbName, primaryShard: st.shard1.shardName}));
assert.commandWorked(st.s.adminCommand({shardCollection: testNs, key: {x: 1}}));
assert.commandWorked(st.s.adminCommand({split: testNs, middle: {x: 0}}));
assert.commandWorked(
st.s.adminCommand({moveChunk: testNs, find: {x: MinKey}, to: st.shard0.shardName, _waitForDelete: true}),
);
assert.commandWorked(
st.s.adminCommand({moveChunk: testNs, find: {x: MinKey}, to: st.shard1.shardName, _waitForDelete: true}),
);
};
// To achieve (3):
// - For reshardCollection, specify "zones" to make shard1 own all chunks.
// - For unshardCollection, specify shard1 as the destination shard.
const runReshardCollection = (st, testDbName, testCollName) => {
const testNs = testDbName + "." + testCollName;
const zoneName = "zoneA";
assert.commandWorked(st.s.adminCommand({addShardToZone: st.shard1.shardName, zone: zoneName}));
assert.commandWorked(
st.s.adminCommand({
reshardCollection: testNs,
numInitialChunks: 1,
key: {x: 1, y: 1},
zones: [{min: {x: MinKey, y: MinKey}, max: {x: MaxKey, y: MaxKey}, zone: zoneName}],
}),
);
};
const runUnshardCollection = (st, testDbName, testCollName) => {
const testNs = testDbName + "." + testCollName;
assert.commandWorked(st.s.adminCommand({unshardCollection: testNs, toShard: st.shard1.shardName}));
};
testCases.push({
testName: "reshardCollection",
setUpFunc: setUp,
runCmdFunc: runReshardCollection,
});
testCases.push({
testName: "unshardCollection",
setUpFunc: setUp,
runCmdFunc: runUnshardCollection,
});
for (let testCase of testCases) {
jsTest.log("Testing case: " + testCase.testName);
runTest(testCase.testName, testCase.setUpFunc, testCase.runCmdFunc);
}
st.stop();