mirror of https://github.com/mongodb/mongo
230 lines
8.9 KiB
JavaScript
230 lines
8.9 KiB
JavaScript
/**
|
|
* Tests that change streams do not output an event for the _shardsvrDropCollectionIfUUIDNotMatching
|
|
* command performed by resharding cleanup.
|
|
*/
|
|
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
|
|
function getCollectionUuid(conn, dbName, collName) {
|
|
const listCollectionRes = assert.commandWorked(
|
|
conn.getDB(dbName).runCommand({listCollections: 1, filter: {name: collName}}),
|
|
);
|
|
return listCollectionRes.cursor.firstBatch[0].info.uuid;
|
|
}
|
|
|
|
/**
|
|
* Returns events up until the given timestamp.
|
|
*/
|
|
function getEvents(cursor, minTimestamp) {
|
|
const events = [];
|
|
let lastEvent;
|
|
do {
|
|
assert.soon(() => cursor.hasNext(), "change stream timed out", 10 * 1000);
|
|
lastEvent = cursor.next();
|
|
events.push(lastEvent);
|
|
} while (timestampCmp(minTimestamp, lastEvent.clusterTime) > 0);
|
|
return events;
|
|
}
|
|
|
|
function assertReshardCollectionEvent(event, dbName, collName, collUuid) {
|
|
assert.eq(event.operationType, "reshardCollection", event);
|
|
assert.eq(event.hasOwnProperty("ns"), true, event);
|
|
assert.eq(event.ns.db, dbName, event);
|
|
assert.eq(event.ns.coll, collName, event);
|
|
assert.eq(event.collectionUUID, collUuid, event);
|
|
}
|
|
|
|
function assertInsertEvent(event, dbName, collName, collUuid, doc) {
|
|
assert.eq(event.operationType, "insert", event);
|
|
assert.eq(event.hasOwnProperty("ns"), true, event);
|
|
assert.eq(event.ns.db, dbName, event);
|
|
assert.eq(event.ns.coll, collName, event);
|
|
if (collUuid) {
|
|
assert.eq(event.collectionUUID, collUuid, event);
|
|
}
|
|
assert.eq(event.fullDocument, doc, event);
|
|
}
|
|
|
|
const rsOptions = {
|
|
setParameter: {
|
|
periodicNoopIntervalSecs: 1,
|
|
writePeriodicNoops: true,
|
|
},
|
|
};
|
|
const st = new ShardingTest({
|
|
shards: 2,
|
|
rs: rsOptions,
|
|
configOptions: rsOptions,
|
|
});
|
|
|
|
function runTest(testName, setUpFunc, runCmdFunc) {
|
|
const testDbName = "testDb-" + testName;
|
|
const testCollName = "testColl";
|
|
|
|
const testDB = st.s.getDB(testDbName);
|
|
const testColl = testDB.getCollection(testCollName);
|
|
const adminDB = st.s.getDB("admin");
|
|
|
|
setUpFunc(st, testDbName, testCollName);
|
|
|
|
const numDocs = 10;
|
|
for (let i = 0; i < numDocs / 2; i++) {
|
|
assert.commandWorked(
|
|
testColl.insert([
|
|
{_id: UUID(), x: -i, y: -i},
|
|
{_id: UUID(), x: i, y: i},
|
|
]),
|
|
);
|
|
}
|
|
|
|
const defaultOptions = {};
|
|
const expandedOptions = {showExpandedEvents: true};
|
|
|
|
const collectionChangeStreamCursorDefault = testColl.aggregate([{$changeStream: defaultOptions}]);
|
|
const collectionChangeStreamCursorExpanded = testColl.aggregate([{$changeStream: expandedOptions}]);
|
|
|
|
const databaseChangeStreamCursorDefault = testDB.aggregate([{$changeStream: defaultOptions}]);
|
|
const databaseChangeStreamCursorExpanded = testDB.aggregate([{$changeStream: expandedOptions}]);
|
|
|
|
const clusterChangeStreamCursorDefault = adminDB.aggregate([
|
|
{$changeStream: Object.assign({allChangesForCluster: true}, defaultOptions)},
|
|
]);
|
|
const clusterChangeStreamCursorExpanded = adminDB.aggregate([
|
|
{$changeStream: Object.assign({allChangesForCluster: true}, expandedOptions)},
|
|
]);
|
|
|
|
const collUuidBefore = getCollectionUuid(st.s, testDbName, testCollName);
|
|
runCmdFunc(st, testDbName, testCollName);
|
|
const collUuidAfter = getCollectionUuid(st.s, testDbName, testCollName);
|
|
|
|
const newDoc = {_id: UUID(), x: 100, y: 100};
|
|
const res = assert.commandWorked(testDB.runCommand({insert: testCollName, documents: [newDoc]}));
|
|
const insertTs = res.operationTime;
|
|
|
|
const validateEvents = (events, isExpanded) => {
|
|
if (isExpanded) {
|
|
assert.eq(events.length, 2, events);
|
|
assertReshardCollectionEvent(events[0], testDbName, testCollName, collUuidBefore);
|
|
assertInsertEvent(events[1], testDbName, testCollName, collUuidAfter, newDoc);
|
|
} else {
|
|
assert.eq(events.length, 1, events);
|
|
assertInsertEvent(events[0], testDbName, testCollName, null /* collUuid */, newDoc);
|
|
}
|
|
};
|
|
|
|
{
|
|
const events = getEvents(collectionChangeStreamCursorDefault, insertTs);
|
|
jsTest.log("The events in the collection change stream (default): " + tojson(events));
|
|
validateEvents(events, false /* isExpanded */);
|
|
}
|
|
|
|
{
|
|
const events = getEvents(collectionChangeStreamCursorExpanded, insertTs);
|
|
jsTest.log("The events in the collection change stream (expanded): " + tojson(events));
|
|
validateEvents(events, true /* isExpanded */);
|
|
}
|
|
|
|
{
|
|
const events = getEvents(databaseChangeStreamCursorDefault, insertTs);
|
|
jsTest.log("The events in the database change stream (default): " + tojson(events));
|
|
validateEvents(events, false /* isExpanded */);
|
|
}
|
|
|
|
{
|
|
const events = getEvents(databaseChangeStreamCursorExpanded, insertTs);
|
|
jsTest.log("The events in the database change stream (expanded): " + tojson(events));
|
|
validateEvents(events, true /* isExpanded */);
|
|
}
|
|
|
|
{
|
|
const events = getEvents(clusterChangeStreamCursorDefault, insertTs);
|
|
jsTest.log("The events in the cluster change stream (default): " + tojson(events));
|
|
validateEvents(events, false /* isExpanded */);
|
|
}
|
|
|
|
{
|
|
const events = getEvents(clusterChangeStreamCursorExpanded, insertTs);
|
|
jsTest.log("The events in the cluster change stream (expanded): " + tojson(events));
|
|
validateEvents(events, true /* isExpanded */);
|
|
}
|
|
}
|
|
|
|
// Set up the test collection such that there is a shard where the
|
|
// _shardsvrDropCollectionIfUUIDNotMatching command performed by resharding cleanup is not a no-op.
|
|
// That is, make the cluster have a shard (namely shard0) that satisfies the following:
|
|
// 1. The shard is not the primary shard for the collection.
|
|
// 2. Before resharding, the shard does not own any chunks for that collection but the
|
|
// collection exists locally on that shard (as an empty collection).
|
|
// 3. After resharding, the shard does not own any chunks for that collection.
|
|
// In other words, shard0 is neither a donor nor a recipient for the resharding operation.
|
|
|
|
// The collection must be sharded because:
|
|
// - Unsharded collections can only be moved using moveCollection (resharding) which always drops
|
|
// the original collection from all shards.
|
|
// - Only sharded collections can be moved incrementally using chunk migrations which do not drop
|
|
// the collection from the donor shard upon moving the last chunk.
|
|
// - To go from sharded to unsharded, a collection must go through unshardCollection (resharding)
|
|
// which again drops the original collection from all shards.
|
|
// For this reason, this test scenario is only applicable to the reshardCollection and
|
|
// unshardCollection commands.
|
|
|
|
const testCases = [];
|
|
|
|
// To achieve (1) and (2):
|
|
// - Make the test database have shard1 as the primary shard.
|
|
// - Shard the test collection and move a chunk from shard1 to shard0 and back to shard1 so the
|
|
// collection exists as an empty collection on shard0.
|
|
const setUp = (st, testDbName, testCollName) => {
|
|
const testNs = testDbName + "." + testCollName;
|
|
assert.commandWorked(st.s.adminCommand({enableSharding: testDbName, primaryShard: st.shard1.shardName}));
|
|
assert.commandWorked(st.s.adminCommand({shardCollection: testNs, key: {x: 1}}));
|
|
assert.commandWorked(st.s.adminCommand({split: testNs, middle: {x: 0}}));
|
|
|
|
assert.commandWorked(
|
|
st.s.adminCommand({moveChunk: testNs, find: {x: MinKey}, to: st.shard0.shardName, _waitForDelete: true}),
|
|
);
|
|
assert.commandWorked(
|
|
st.s.adminCommand({moveChunk: testNs, find: {x: MinKey}, to: st.shard1.shardName, _waitForDelete: true}),
|
|
);
|
|
};
|
|
|
|
// To achieve (3):
|
|
// - For reshardCollection, specify "zones" to make shard1 own all chunks.
|
|
// - For unshardCollection, specify shard1 as the destination shard.
|
|
const runReshardCollection = (st, testDbName, testCollName) => {
|
|
const testNs = testDbName + "." + testCollName;
|
|
|
|
const zoneName = "zoneA";
|
|
assert.commandWorked(st.s.adminCommand({addShardToZone: st.shard1.shardName, zone: zoneName}));
|
|
assert.commandWorked(
|
|
st.s.adminCommand({
|
|
reshardCollection: testNs,
|
|
numInitialChunks: 1,
|
|
key: {x: 1, y: 1},
|
|
zones: [{min: {x: MinKey, y: MinKey}, max: {x: MaxKey, y: MaxKey}, zone: zoneName}],
|
|
}),
|
|
);
|
|
};
|
|
const runUnshardCollection = (st, testDbName, testCollName) => {
|
|
const testNs = testDbName + "." + testCollName;
|
|
assert.commandWorked(st.s.adminCommand({unshardCollection: testNs, toShard: st.shard1.shardName}));
|
|
};
|
|
|
|
testCases.push({
|
|
testName: "reshardCollection",
|
|
setUpFunc: setUp,
|
|
runCmdFunc: runReshardCollection,
|
|
});
|
|
testCases.push({
|
|
testName: "unshardCollection",
|
|
setUpFunc: setUp,
|
|
runCmdFunc: runUnshardCollection,
|
|
});
|
|
|
|
for (let testCase of testCases) {
|
|
jsTest.log("Testing case: " + testCase.testName);
|
|
runTest(testCase.testName, testCase.setUpFunc, testCase.runCmdFunc);
|
|
}
|
|
|
|
st.stop();
|