/** * Test to verify that all the CRUD operations get routed to the correct shard when the shard key is * compound hashed. * * @tags: [ * multiversion_incompatible, * requires_majority_read_concern, * ] */ import {arrayEq} from "jstests/aggregation/extras/utils.js"; import { profilerHasAtLeastOneMatchingEntryOrThrow, profilerHasSingleMatchingEntryOrThrow, profilerHasZeroMatchingEntriesOrThrow, } from "jstests/libs/profiler.js"; import {assertStagesForExplainOfCommand} from "jstests/libs/query/analyze_plan.js"; import {ShardingTest} from "jstests/libs/shardingtest.js"; import {findChunksUtil} from "jstests/sharding/libs/find_chunks_util.js"; import {isUweEnabled, mapUweShardCmdName} from "jstests/libs/query/uwe_utils.js"; const st = new ShardingTest({shards: 2}); const kDbName = jsTestName(); const ns = kDbName + ".coll"; assert.commandWorked(st.s.adminCommand({enableSharding: kDbName, primaryShard: st.shard0.shardName})); // Enable 'retryWrites' so that the shard key fields are updatable. const session = st.s.startSession({retryWrites: true}); const sessionDB = session.getDatabase(kDbName); const coll = sessionDB["coll"]; const shard0DB = st.shard0.getDB(kDbName); const shard1DB = st.shard1.getDB(kDbName); const uweEnabled = isUweEnabled(st.s); /** * Enables profiling on both shards so that we can verify the targeting behaviour. */ function restartProfiling() { for (let shardDB of [shard0DB, shard1DB]) { shardDB.setProfilingLevel(0); shardDB.system.profile.drop(); shardDB.setProfilingLevel(2); } } /** * Runs find command with the 'filter' and validates that the output returned matches * 'expectedOutput'. Also runs explain() command on the same find command and validates that all * the 'expectedStages' are present in the plan returned. */ function validateFindCmdOutputAndPlan({filter, expectedStages, expectedOutput, testName}) { restartProfiling(); const cmdObj = {find: coll.getName(), filter: filter, projection: {_id: 0}, comment: testName}; const res = assert.commandWorked(coll.runCommand(cmdObj)); const ouputArray = new DBCommandCursor(coll.getDB(), res).toArray(); // We ignore the order since hashed index order is not predictable. assert(arrayEq(expectedOutput, ouputArray), ouputArray); assertStagesForExplainOfCommand({coll, cmdObj, expectedStages}); } /** * Tests when range field is a prefix of compound hashed shard key. */ assert.commandWorked(st.s.getDB("config").adminCommand({shardCollection: ns, key: {a: 1, b: "hashed", c: 1}})); assert.commandWorked(st.s.adminCommand({split: ns, middle: {a: 0, b: MinKey, c: MinKey}})); // Postive numbers of 'a' should go to 'shard1DB' and negative numbers should go to 'shard0DB' assert.commandWorked( st.s.adminCommand({ moveChunk: ns, bounds: [ {a: 0, b: MinKey, c: MinKey}, {a: MaxKey, b: MaxKey, c: MaxKey}, ], to: st.shard1.shardName, }), ); restartProfiling(); // Test to verify that insert operations are routed to correct shard and succeeds on the respective // shards. let shardCmdName = uweEnabled ? mapUweShardCmdName("insert") : "insert"; for (let i = 20; i < 40; i++) { assert.commandWorked(coll.insert({a: i, b: {subObj: "str_" + (i % 13)}, c: NumberInt(i % 10)})); profilerHasZeroMatchingEntriesOrThrow({ profileDB: shard0DB, filter: {ns: ns, op: shardCmdName, "command.insert": {$exists: true}, "command.documents.a": i}, }); profilerHasSingleMatchingEntryOrThrow({ profileDB: shard1DB, filter: {ns: ns, op: shardCmdName, "command.insert": {$exists: true}, "command.documents.a": i, "ninserted": 1}, }); assert.commandWorked(coll.insert({a: -i, b: {subObj: "str_" + (i % 13)}, c: NumberInt(i % 10)})); profilerHasZeroMatchingEntriesOrThrow({ profileDB: shard1DB, filter: {ns: ns, op: shardCmdName, "command.insert": {$exists: true}, "command.documents.a": -i}, }); profilerHasSingleMatchingEntryOrThrow({ profileDB: shard0DB, filter: { ns: ns, op: shardCmdName, "command.insert": {$exists: true}, "command.documents.a": -i, "ninserted": 1, }, }); } // Verify that $in query with all predicates values present in a single shard, can be targeted // correctly. Also verify that the command uses index scan on the individual shard. let testName = "FindWithDollarIn"; validateFindCmdOutputAndPlan({ filter: { a: {$in: [-38, -37]}, b: {$in: [{subObj: "str_12"}, {subObj: "str_11"}]}, c: {$in: [7, 8]}, }, expectedOutput: [ {a: -37, b: {subObj: "str_11"}, c: 7}, {a: -38, b: {subObj: "str_12"}, c: 8}, ], expectedStages: ["IXSCAN", "FETCH", "SINGLE_SHARD"], testName: testName, }); profilerHasZeroMatchingEntriesOrThrow({ profileDB: shard1DB, filter: {ns: ns, "command.find": "coll", "command.comment": testName}, }); profilerHasSingleMatchingEntryOrThrow({ profileDB: shard0DB, filter: {ns: ns, "command.find": "coll", "command.comment": testName}, }); // Verify that a range query on a non-hashed prefix field can target a single shard if all values in // the range are on that shard. Also verify that the command uses index scan on the individual // shard. testName = "Range_query"; validateFindCmdOutputAndPlan({ filter: {a: {$gt: 25, $lt: 29}, b: {subObj: "str_0"}}, expectedOutput: [{a: 26, b: {subObj: "str_0"}, c: 6}], expectedStages: ["IXSCAN", "FETCH", "SINGLE_SHARD"], testName: testName, }); profilerHasSingleMatchingEntryOrThrow({ profileDB: shard1DB, filter: {ns: ns, "command.find": "coll", "command.comment": testName}, }); profilerHasZeroMatchingEntriesOrThrow({ profileDB: shard0DB, filter: {ns: ns, "command.find": "coll", "command.comment": testName}, }); // Test to verify that the update operation can use query to route the operation. Also verify that // updating shard key value succeeds. let updateObj = {a: 22, b: {subObj: "str_0"}, c: "update", p: 1}; let res = assert.commandWorked(coll.update({a: 26, b: {subObj: "str_0"}, c: 6}, updateObj)); assert.eq(res.nModified, 1, res); assert.eq(coll.count(updateObj), 1); shardCmdName = uweEnabled ? mapUweShardCmdName("update") : "update"; profilerHasSingleMatchingEntryOrThrow({profileDB: shard1DB, filter: {ns: ns, "op": shardCmdName}}); profilerHasZeroMatchingEntriesOrThrow({profileDB: shard0DB, filter: {ns: ns, "op": shardCmdName}}); // Test to verify that the update operation can use query to route the operation. Also verify that // updating shard key value succeeds when the document has to move shard. testName = "updateShardKeyValueWrongShard"; updateObj = { $set: {a: -100, p: testName}, }; restartProfiling(); res = assert.commandWorked(coll.update({a: 22, b: {subObj: "str_0"}, c: "update"}, updateObj)); assert.eq(res.nModified, 1, res); // Verify that the 'update' command gets targeted to 'shard1DB'. // TODO SERVER-104122: Handle WCOS error in UWE. if (!uweEnabled) { profilerHasAtLeastOneMatchingEntryOrThrow({profileDB: shard1DB, filter: {ns: ns, "op": shardCmdName}}); profilerHasZeroMatchingEntriesOrThrow({profileDB: shard0DB, filter: {ns: ns, "op": shardCmdName}}); } // Verify that the 'count' command gets targeted to 'shard0DB' after the update. assert.eq(coll.count(updateObj["$set"]), 1); profilerHasSingleMatchingEntryOrThrow({profileDB: shard0DB, filter: {ns: ns, "command.count": "coll"}}); profilerHasZeroMatchingEntriesOrThrow({profileDB: shard1DB, filter: {ns: ns, "command.count": "coll"}}); // Test to verify that the 'delete' command with a range query predicate can target a single shard // if all values in the range are on that shard. restartProfiling(); res = assert.commandWorked(coll.remove({a: {$lte: -1}})); assert.eq(res.nRemoved, 21, res); assert.eq(coll.count({a: {$lte: -1}}), 0); shardCmdName = uweEnabled ? mapUweShardCmdName("remove") : "remove"; profilerHasSingleMatchingEntryOrThrow({profileDB: shard0DB, filter: {ns: ns, "op": shardCmdName}}); profilerHasZeroMatchingEntriesOrThrow({profileDB: shard1DB, filter: {ns: ns, "op": shardCmdName}}); /** * Test when hashed field is a prefix. */ coll.drop(); // Since the prefix field of the shard key is hashed, we pre-split the collection using hashed field // and distribute the resulting chunks equally among the shards. assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {a: "hashed", b: 1, c: 1}})); /** * Finds the shard to which hashed value of 'fieldValue' belongs to and validates that there exists * a single profiler entry on that shard, for the given 'filter'. Also verifies that no entry * matching 'filter' is present on the other shard. */ function verifyProfilerEntryOnCorrectShard(fieldValue, filter) { // Find the chunk to which 'hashedValue' belongs to. We use $expr here so that the $lte and $gt // comparisons occurs across data types. const hashedValue = convertShardKeyToHashed(fieldValue); const nsOrUUID = (function () { const coll = st.s.getDB("config").collections.findOne({_id: ns}); if (coll.timestamp) { return {$eq: ["$uuid", coll.uuid]}; } else { return {$eq: ["$ns", ns]}; } })(); const chunk = st.s.getDB("config").chunks.findOne({ $expr: {$and: [{$lte: ["$min.a", hashedValue]}, {$gt: ["$max.a", hashedValue]}, nsOrUUID]}, }); assert(chunk, findChunksUtil.findChunksByNs(st.s.getDB("config"), ns).toArray()); const [targetShard, otherShard] = chunk.shard == st.shard0.shardName ? [st.shard0, st.shard1] : [st.shard1, st.shard0]; profilerHasSingleMatchingEntryOrThrow({profileDB: targetShard.getDB(kDbName), filter: filter}); profilerHasZeroMatchingEntriesOrThrow({profileDB: otherShard.getDB(kDbName), filter: filter}); } // Test to verify that insert operations are routed to a single shard and succeeds on the respective // shards. restartProfiling(); let profileFilter = {}; shardCmdName = uweEnabled ? mapUweShardCmdName("insert") : "insert"; for (let i = -10; i < 10; i++) { profileFilter = { ns: ns, op: shardCmdName, "command.insert": {$exists: true}, "command.documents.a": i, "ninserted": 1, }; assert.commandWorked(coll.insert({_id: i, a: i, b: {subObj: "str_" + (i % 5)}, c: NumberInt(i % 4)})); verifyProfilerEntryOnCorrectShard(i, profileFilter); } // Test to verify that an equality match on the hashed prefix can be routed to single shard. testName = "FindWithEqualityOnHashedPrefix"; validateFindCmdOutputAndPlan({ filter: {a: 0}, expectedOutput: [{a: 0, b: {subObj: "str_0"}, c: 0}], expectedStages: ["IXSCAN", "FETCH", "SINGLE_SHARD"], testName: testName, }); profileFilter = { ns: ns, "command.find": "coll", "command.comment": testName, }; verifyProfilerEntryOnCorrectShard(0, profileFilter); // Test to verify that a range query on hashed field will be routed to all nodes and the individual // nodes cannot use index to answer the query. testName = "FindWithRangeQueryOnHashedPrefix"; validateFindCmdOutputAndPlan({ filter: {a: {$gt: 8}}, expectedOutput: [{a: 9, b: {subObj: "str_4"}, c: 1}], expectedStages: ["COLLSCAN", "SHARD_MERGE"], testName: testName, }); profileFilter = { ns: ns, "command.find": "coll", "command.comment": testName, }; profilerHasSingleMatchingEntryOrThrow({profileDB: shard0DB, filter: profileFilter}); profilerHasSingleMatchingEntryOrThrow({profileDB: shard1DB, filter: profileFilter}); // Test to verify that update with only a shard key prefix in the query can be routed correctly. testName = "updateWithHashedPrefix"; updateObj = { $set: {p: testName}, }; res = assert.commandWorked(coll.update({a: 0}, updateObj)); assert.eq(res.nModified, 1, res); // Verify that update has modified the intended object. assert.eq(coll.count({a: 0, p: testName}), 1); // Verify that the update has been routed to the correct shard. shardCmdName = uweEnabled ? mapUweShardCmdName("update") : "update"; profileFilter = { ns: ns, "op": shardCmdName, }; verifyProfilerEntryOnCorrectShard(0, profileFilter); // Sharded updateOnes that do not directly target a shard can now use the two phase write // protocol to execute. res = assert.commandWorked(coll.update({a: {$lt: 1}}, updateObj)); assert.eq(res.nMatched, 1, res); // Test to verify that delete with full shard key in the query can be routed correctly. restartProfiling(); res = assert.commandWorked(coll.deleteOne({a: 1, b: {subObj: "str_1"}, c: 1})); assert.eq(res.deletedCount, 1, res); assert.eq(coll.count({a: 1, b: {subObj: "str_1"}, c: 1}), 0); shardCmdName = uweEnabled ? mapUweShardCmdName("remove") : "remove"; profileFilter = { ns: ns, "op": shardCmdName, }; verifyProfilerEntryOnCorrectShard(1, profileFilter); // Sharded deleteOnes that do not directly target a shard can now use the two phase write // protocol to execute. assert.commandWorked(coll.runCommand({delete: coll.getName(), deletes: [{q: {a: 1}, limit: 1}], ordered: false})); st.stop();