mirror of https://github.com/mongodb/mongo
SERVER-114198 Create a Mongo proxy to manage a pool of connection (#44455)
GitOrigin-RevId: f047ef03b7eb0ce5e7336d272bc51b531324a254
This commit is contained in:
parent
116ea1c8bd
commit
8103eb4a01
|
|
@ -1412,6 +1412,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
|
|||
|
||||
# The following patterns are parsed from ./jstests/noPassthrough/shell/OWNERS.yml
|
||||
/jstests/noPassthrough/shell/**/* @10gen/devprod-correctness @svc-auto-approve-bot
|
||||
/jstests/noPassthrough/shell/**/multi_router_basic.js @10gen/server-catalog-and-routing-routing-and-topology @svc-auto-approve-bot
|
||||
|
||||
# The following patterns are parsed from ./jstests/noPassthrough/shell/js/OWNERS.yml
|
||||
/jstests/noPassthrough/shell/js/**/* @10gen/devprod-correctness @svc-auto-approve-bot
|
||||
|
|
@ -3153,6 +3154,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
|
|||
/src/mongo/shell/**/keyvault.js @10gen/server-security @svc-auto-approve-bot
|
||||
/src/mongo/shell/**/named_pipe_test_helper* @10gen/query-integration-features @svc-auto-approve-bot
|
||||
/src/mongo/shell/**/feature_compatibility_version.js @10gen/server-fcv @svc-auto-approve-bot
|
||||
/src/mongo/shell/**/*multi_router*.js @10gen/server-catalog-and-routing-routing-and-topology @svc-auto-approve-bot
|
||||
|
||||
# The following patterns are parsed from ./src/mongo/stdx/OWNERS.yml
|
||||
/src/mongo/stdx/**/* @10gen/server-programmability @svc-auto-approve-bot
|
||||
|
|
|
|||
|
|
@ -79,6 +79,10 @@ export default [
|
|||
removeFCVDocument: true,
|
||||
runFeatureFlagMultiversionTest: true,
|
||||
|
||||
// src/mongo/shell/mongo_router_global.js
|
||||
MultiRouterMongo: true,
|
||||
toConnectionsList: true,
|
||||
|
||||
// src/mongo/shell/query_global.js
|
||||
DBQuery: true,
|
||||
DBCommandCursor: true,
|
||||
|
|
|
|||
|
|
@ -3,3 +3,6 @@ filters:
|
|||
- "*":
|
||||
approvers:
|
||||
- 10gen/devprod-correctness
|
||||
- "multi_router_basic.js":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing-routing-and-topology
|
||||
|
|
|
|||
|
|
@ -0,0 +1,959 @@
|
|||
/**
|
||||
* Unit test the MultiRouterMongo class provided in the shell
|
||||
* @tags: [requires_sharding]
|
||||
*/
|
||||
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
import {withRetryOnTransientTxnError} from "jstests/libs/auto_retry_transaction_in_sharding.js";
|
||||
|
||||
const st = new ShardingTest({shards: 1, mongos: 3});
|
||||
const getMongosesURI = function () {
|
||||
const mongosHosts = st._mongos.map((m) => m.host).join(",");
|
||||
const uri = `mongodb://${mongosHosts}/test`;
|
||||
return uri;
|
||||
};
|
||||
|
||||
function testCase(description, fn) {
|
||||
chatty(`[MULTI-ROUTER-TEST] ${description}`);
|
||||
fn();
|
||||
}
|
||||
|
||||
// Given a Multi-Router Mongo, extend _getNextMongo to count calls.
|
||||
// _getNextMongo is responsible for iterating over multiple mongos.
|
||||
// Counting _getNextMongo is useful to verify how many times the multi-router had to switch connections.
|
||||
function overrideGetNextMongoCountingCalls(multiRouterMongo) {
|
||||
let getNextMongoCount = 0;
|
||||
const originalGetNextMongo = multiRouterMongo._getNextMongo.bind(multiRouterMongo);
|
||||
multiRouterMongo._getNextMongo = function () {
|
||||
const mongo = originalGetNextMongo();
|
||||
getNextMongoCount++;
|
||||
return mongo;
|
||||
};
|
||||
return {
|
||||
count: () => getNextMongoCount,
|
||||
reset: () => {
|
||||
getNextMongoCount = 0;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Test toConnectionList
|
||||
// ============================================================================
|
||||
|
||||
testCase("Testing toConnectionList with single server", () => {
|
||||
const uri = `mongodb://${st.s0.host}/test`;
|
||||
const uriList = toConnectionsList(new MongoURI(uri));
|
||||
|
||||
assert.eq(uriList.length, 1, "Should have 1 URI");
|
||||
assert(uriList[0].includes(st.s0.host), "URI should contain correct host");
|
||||
assert(uriList[0].includes("/test"), "URI should contain database");
|
||||
});
|
||||
|
||||
testCase("Testing toConnectionList with multiple servers", () => {
|
||||
const uri = getMongosesURI();
|
||||
const uriList = toConnectionsList(new MongoURI(uri));
|
||||
|
||||
assert.eq(uriList.length, 3, "Should have 3 URIs");
|
||||
|
||||
// Verify each URI is unique and contains correct parts
|
||||
uriList.forEach((uri) => {
|
||||
assert(uri.startsWith("mongodb://"), "URI should start with mongodb://");
|
||||
assert(uri.includes("/test"), "URI should contain database");
|
||||
});
|
||||
});
|
||||
|
||||
testCase("Testing toConnectionList preserves options", () => {
|
||||
const mongosHosts = st._mongos.map((m) => m.host).join(",");
|
||||
const uri = `mongodb://${mongosHosts}/test?readPreference=secondary&retryWrites=true`;
|
||||
const uriList = toConnectionsList(new MongoURI(uri));
|
||||
|
||||
assert.eq(uriList.length, 3, "Should have 3 URIs");
|
||||
|
||||
uriList.forEach((uri) => {
|
||||
assert(uri.includes("readPreference=secondary"), "URI should contain readPreference option");
|
||||
assert(uri.includes("retryWrites=true"), "URI should contain retryWrites option");
|
||||
});
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Test connection type
|
||||
// ============================================================================
|
||||
|
||||
testCase("Standalone mongod", () => {
|
||||
const standalone = MongoRunner.runMongod({});
|
||||
const uri = `mongodb://${standalone.host}/test`;
|
||||
|
||||
const db = connect(uri);
|
||||
|
||||
// Should be regular Mongo, not MultiRouterMongo
|
||||
assert.eq(db.getMongo().isMultiRouter, undefined, "Should not be MultiRouterMongo");
|
||||
MongoRunner.stopMongod(standalone);
|
||||
});
|
||||
|
||||
testCase("Single mongos", () => {
|
||||
const uri = `mongodb://${st.s1.host}/test`;
|
||||
|
||||
const db = connect(uri);
|
||||
|
||||
// Should be regular Mongo, not MultiRouterMongo
|
||||
assert.eq(db.getMongo().isMultiRouter, undefined, "Should not be MultiRouterMongo");
|
||||
});
|
||||
|
||||
testCase("Replica set", () => {
|
||||
const rst = new ReplSetTest({nodes: 3});
|
||||
rst.startSet();
|
||||
rst.initiate();
|
||||
rst.getPrimary();
|
||||
|
||||
// Note we can't use the "getURL" method provided by the ReplSetTest.
|
||||
// That method provides a valid URL for establishing a connection via Mongo.
|
||||
// The 'connect' function we are testing expects a valid fixture url,
|
||||
// which is composed by a protocol followed by a comma-separated list of host:port pairs and options.
|
||||
const mongodHosts = rst.nodes.map((m) => m.host).join(",");
|
||||
const uri = `mongodb://${mongodHosts}/test?replicaSet=${rst.name}`;
|
||||
const mongo = connect(uri).getMongo();
|
||||
|
||||
// Should be regular Mongo, not MultiRouterMongo
|
||||
assert.eq(mongo.isMultiRouter, undefined, "Should not be MultiRouterMongo");
|
||||
|
||||
//Every node of the replica-set is also a regular Mongo
|
||||
rst.nodes.forEach((node) => {
|
||||
// Assert is a mongo but not a multi-router type
|
||||
assert.neq(node._getDefaultSession, undefined);
|
||||
assert.eq(node.isMultiRouter, undefined, `Replica-Set node ${node} should not be MultiRouterMongo`);
|
||||
});
|
||||
|
||||
rst.stopSet();
|
||||
});
|
||||
|
||||
testCase("Individual shard connections are regular Mongo", () => {
|
||||
// Check each shard connection
|
||||
st._connections.forEach((shardConn, idx) => {
|
||||
// Assert is a mongo but not a multi-router type
|
||||
assert.neq(shardConn._getDefaultSession, undefined);
|
||||
assert.eq(shardConn.isMultiRouter, undefined, `Shard ${idx} should not be MultiRouterMongo`);
|
||||
});
|
||||
|
||||
// Check individual mongos connections (when accessed directly)
|
||||
st._mongos.forEach((mongos, idx) => {
|
||||
// Assert is a mongo but not a multi-router type
|
||||
assert.neq(mongos._getDefaultSession, undefined);
|
||||
assert.eq(mongos.isMultiRouter, undefined, `Individual mongos ${idx} should not be MultiRouterMongo`);
|
||||
});
|
||||
});
|
||||
|
||||
testCase("Multiple mongos creates MultiRouterMongo", () => {
|
||||
const uri = getMongosesURI();
|
||||
const db = connect(uri);
|
||||
|
||||
// Should be MultiRouterMongo
|
||||
assert.eq(db.getMongo().isMultiRouter, true, "Should be MultiRouterMongo");
|
||||
assert.eq(db.getMongo().isConnectedToMongos(), true, "MultiRouterMongo must be connected to a mongos");
|
||||
assert.eq(db.getMongo()._mongoConnections.length, 3, "Should have 3 mongo connections");
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Test basic properties
|
||||
// ============================================================================
|
||||
|
||||
testCase("getSiblingDB returns a proxy", () => {
|
||||
const uri = getMongosesURI();
|
||||
const db = connect(uri);
|
||||
|
||||
assert.eq(db.getMongo().isMultiRouter, true, "Should be MultiRouterMongo");
|
||||
|
||||
let newDb = db.getSiblingDB("admin");
|
||||
|
||||
// Should be MultiRouterMongo
|
||||
assert.eq(newDb.getMongo().isMultiRouter, true, "Should be MultiRouterMongo");
|
||||
assert.eq(newDb.getMongo().isConnectedToMongos(), true, "MultiRouterMongo must be connected to a mongos");
|
||||
assert.eq(newDb.getMongo()._mongoConnections.length, 3, "Should have 3 mongo connections");
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Test mongos selections via basic commands
|
||||
// ============================================================================
|
||||
|
||||
testCase("adminCommand is intercepted by proxy's runCommand", () => {
|
||||
const uri = getMongosesURI();
|
||||
const db = connect(uri);
|
||||
const conn = db.getMongo();
|
||||
|
||||
// Track runCommand calls
|
||||
let countCalls = 0;
|
||||
const originalRunCommand = conn.runCommand.bind(conn);
|
||||
conn.runCommand = function (dbname, cmd, options) {
|
||||
assert.eq(dbname, "admin");
|
||||
countCalls++;
|
||||
return originalRunCommand(dbname, cmd, options);
|
||||
};
|
||||
|
||||
db.adminCommand({ping: 1});
|
||||
assert.eq(db.getMongo().isMultiRouter, true, "Should be MultiRouterMongo");
|
||||
assert.eq(countCalls, 1, "Should call run command at least once!");
|
||||
});
|
||||
|
||||
testCase("Testing basic call distribution among the mongoses pool via ping", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const kOperations = 10;
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
// Run commands
|
||||
let adminDb = conn.getDB("admin");
|
||||
for (let i = 0; i < kOperations; i++) {
|
||||
adminDb.runCommand({ping: 1});
|
||||
}
|
||||
|
||||
// The first runCommand will start a session which also calls _getNextMongo. We check it's called at least kTotalCount times.
|
||||
assert.gte(getNextMongoTracker.count(), kOperations);
|
||||
});
|
||||
|
||||
testCase("Testing call distribution via insert", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.test;
|
||||
const kOperations = 10;
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
for (let i = 0; i < kOperations; i++) {
|
||||
coll.insert({x: i, type: "insert"});
|
||||
}
|
||||
|
||||
// Should route randomly for all operations
|
||||
assert.gte(getNextMongoTracker.count(), kOperations, "Should route randomly for insert operations");
|
||||
|
||||
// Verify all documents were inserted
|
||||
assert.eq(kOperations, coll.count({type: "insert"}), "All inserts should succeed");
|
||||
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Testing call distribution via update", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.test;
|
||||
const kOperations = 10;
|
||||
|
||||
// Insert test data
|
||||
const docs = [];
|
||||
for (let i = 0; i < kOperations; i++) {
|
||||
docs.push({x: i, updated: false});
|
||||
}
|
||||
coll.insertMany(docs);
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
for (let i = 0; i < kOperations; i++) {
|
||||
coll.updateOne({x: i}, {$set: {updated: true}});
|
||||
}
|
||||
|
||||
// Should route randomly for all operations
|
||||
assert.gte(getNextMongoTracker.count(), kOperations, "Should route randomly for update operations");
|
||||
|
||||
// Verify all documents were updated
|
||||
assert.eq(kOperations, coll.count({updated: true}), "All updates should succeed");
|
||||
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Testing call distribution via delete", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.test;
|
||||
const kOperations = 10;
|
||||
|
||||
// Insert test data
|
||||
const docs = [];
|
||||
for (let i = 0; i < kOperations; i++) {
|
||||
docs.push({x: i, toDelete: true});
|
||||
}
|
||||
coll.insertMany(docs);
|
||||
assert.eq(kOperations, coll.count({}), "Should have documents to delete");
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
for (let i = 0; i < kOperations; i++) {
|
||||
coll.deleteOne({x: i});
|
||||
}
|
||||
|
||||
// Should route randomly for all operations
|
||||
assert.gte(getNextMongoTracker.count(), kOperations, "Should route randomly for delete operations");
|
||||
|
||||
// Verify correct number of documents remain
|
||||
assert.eq(0, coll.count({}), "Should have no documents remaining");
|
||||
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Test mongos selections via aggregate and find
|
||||
// ============================================================================
|
||||
|
||||
function insertSampleData(coll, totalDocs) {
|
||||
// Insert test data
|
||||
const docs = [];
|
||||
for (let i = 0; i < totalDocs; i++) {
|
||||
docs.push({_id: i, value: i});
|
||||
}
|
||||
coll.insertMany(docs);
|
||||
}
|
||||
|
||||
function overrideRunCommandCountingGetMoreCalls(conn) {
|
||||
// Track getMore calls
|
||||
let getMoreCount = 0;
|
||||
const originalRunCommand = conn.runCommand.bind(conn);
|
||||
conn.runCommand = function (dbname, cmd, options) {
|
||||
if (cmd.getMore) {
|
||||
getMoreCount++;
|
||||
}
|
||||
return originalRunCommand(dbname, cmd, options);
|
||||
};
|
||||
return {
|
||||
count: () => getMoreCount,
|
||||
};
|
||||
}
|
||||
|
||||
testCase("Testing basic find + getMore will stick to the same connection", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const coll = conn.getDB("test").cursorTest;
|
||||
const kTotalDocs = 120;
|
||||
|
||||
// Insert sample data
|
||||
insertSampleData(coll, kTotalDocs);
|
||||
|
||||
// Track getMore calls
|
||||
const getMoreTracker = overrideRunCommandCountingGetMoreCalls(conn);
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
// Create cursor with small batch size to force getMore
|
||||
const cursor = coll.find({value: {$gte: 0}}).batchSize(2);
|
||||
let countDocs = 0;
|
||||
while (cursor.hasNext()) {
|
||||
countDocs++;
|
||||
cursor.next();
|
||||
}
|
||||
|
||||
assert.eq(countDocs, kTotalDocs, "GetMore never run!");
|
||||
// GetMore will never hit the proxy unless explicitly run via runCommand. next() self handle this case.
|
||||
assert.eq(getMoreTracker.count(), 0, "Must not have tracked getMore commands");
|
||||
// Only the find command must have run _getNextMongo
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Must run getNextMongoCount only once when executing the find");
|
||||
assert.eq(conn._cursorTracker.count(), 1, "Should have exactly 1 tracked cursor");
|
||||
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Testing basic aggregate + getMore will stick to the same connection", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const coll = conn.getDB("test").cursorTest;
|
||||
const kTotalDocs = 120;
|
||||
|
||||
// Insert sample data
|
||||
insertSampleData(coll, kTotalDocs);
|
||||
|
||||
// Track getMore calls
|
||||
const getMoreTracker = overrideRunCommandCountingGetMoreCalls(conn);
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
// Create cursor with small batch size to force getMore
|
||||
const cursor = coll.aggregate([{$match: {value: {$gte: 0}}}], {cursor: {batchSize: 2}});
|
||||
let countDocs = 0;
|
||||
while (cursor.hasNext()) {
|
||||
countDocs++;
|
||||
cursor.next();
|
||||
}
|
||||
|
||||
assert.eq(countDocs, kTotalDocs, "GetMore never run!");
|
||||
// GetMore will never hit the proxy unless explicitly run via runCommand. next() self handle this case.
|
||||
assert.eq(getMoreTracker.count(), 0, "Must have executed getMore commands");
|
||||
// Only the aggregate command must have run _getNextMongo
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Must run getNextMongoCount only once when executing the aggregate");
|
||||
assert.eq(conn._cursorTracker.count(), 1, "Should have exactly 1 tracked cursor");
|
||||
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Testing basic aggregate + getMore run explicitly will stick to the same connection", () => {
|
||||
const uri = getMongosesURI();
|
||||
const db = connect(uri);
|
||||
const conn = db.getMongo();
|
||||
const coll = db.cursorTest;
|
||||
const kTotalDocs = 120;
|
||||
|
||||
// Insert sample data
|
||||
insertSampleData(coll, kTotalDocs);
|
||||
|
||||
// Track getMore calls
|
||||
const getMoreTracker = overrideRunCommandCountingGetMoreCalls(conn);
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
// Create cursor with small batch size and force a getMore.
|
||||
const cursor = coll.aggregate([{$match: {value: {$gte: 0}}}], {cursor: {batchSize: 2}});
|
||||
assert.commandWorked(db.runCommand({getMore: cursor.getId(), collection: "cursorTest"}));
|
||||
// We must have called getMore via proxy
|
||||
assert.eq(getMoreTracker.count(), 1, "Must have executed getMore commands");
|
||||
// Only the aggregate command must have run _getNextMongo
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Must run getNextMongoCount only once when executing the aggregate");
|
||||
assert.eq(conn._cursorTracker.count(), 1, "Should have exactly 1 tracked cursor");
|
||||
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Test disable Multi-Routing
|
||||
// ============================================================================
|
||||
|
||||
testCase("Testing isMultiRoutingDisabled will make the proxy run like a single router connection", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const kTotalCount = 30;
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
TestData.skipMultiRouterRotation = true;
|
||||
// Run commands
|
||||
let adminDb = conn.getDB("admin");
|
||||
for (let i = 0; i < kTotalCount; i++) {
|
||||
adminDb.runCommand({ping: 1});
|
||||
}
|
||||
|
||||
// We should never call _getNextMongo if multi routing is disabled
|
||||
assert.eq(0, getNextMongoTracker.count(), "Should never call _getNextMongo when multi routing is disabled");
|
||||
|
||||
// Set back to false for other tests
|
||||
TestData.skipMultiRouterRotation = false;
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Test session mapping with transactions
|
||||
// ============================================================================
|
||||
|
||||
testCase("Testing explicit session without transaction routes randomly", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.sessionTest;
|
||||
|
||||
const session = conn.startSession();
|
||||
const sessionDB = session.getDatabase("test");
|
||||
const sessionColl = sessionDB.sessionTest;
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
const kOperations = 10;
|
||||
for (let i = 0; i < kOperations; i++) {
|
||||
sessionColl.insert({x: i, type: "no-txn"}, {session});
|
||||
}
|
||||
|
||||
// Should have called _getNextMongo for each operation (random routing)
|
||||
assert.gte(getNextMongoTracker.count(), kOperations, "Should route randomly for non-transactional operations");
|
||||
|
||||
// Session should NOT be tracked in the map
|
||||
assert.eq(conn._sessionToMongoMap.size(), 0, "Session should not be tracked without transaction");
|
||||
|
||||
session.endSession();
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Testing lazy mapping only occurs when txnNumber is present", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.sessionTest;
|
||||
|
||||
const session = conn.startSession();
|
||||
|
||||
// After creating session, map should be empty
|
||||
assert.eq(conn._sessionToMongoMap.size(), 0, "Map should be empty after session creation");
|
||||
|
||||
const sessionDB = session.getDatabase("test");
|
||||
const sessionColl = sessionDB.sessionTest;
|
||||
|
||||
// Perform non-transactional operation
|
||||
sessionColl.insert({x: 1, phase: "no-txn"}, {session});
|
||||
|
||||
// Map should still be empty (no txnNumber)
|
||||
assert.eq(conn._sessionToMongoMap.size(), 0, "Map should remain empty for non-txn operations");
|
||||
|
||||
// Start transaction (first operation with txnNumber)
|
||||
session.startTransaction();
|
||||
sessionColl.insert({x: 2, phase: "txn"}, {session});
|
||||
|
||||
// Now map should contain the session
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Map should contain session after txn starts");
|
||||
|
||||
session.commitTransaction_forTesting();
|
||||
session.endSession();
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Testing transaction operations pin to same mongos", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.sessionTest;
|
||||
|
||||
const session = conn.startSession();
|
||||
const sessionDB = session.getDatabase("test");
|
||||
const sessionColl = sessionDB.sessionTest;
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
withRetryOnTransientTxnError(() => {
|
||||
session.startTransaction();
|
||||
|
||||
// First operation in transaction
|
||||
sessionColl.insert({x: 1, type: "txn"}, {session});
|
||||
|
||||
// Should have called _getNextMongo once for first operation
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Should call _getNextMongo once for first txn operation");
|
||||
// Session should now be tracked
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session should be tracked after first txn operation");
|
||||
|
||||
// Subsequent operations should not call _getNextMongo (pinned to same mongos)
|
||||
for (let i = 2; i <= 5; i++) {
|
||||
sessionColl.insert({x: i, type: "txn"}, {session});
|
||||
}
|
||||
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Should not call _getNextMongo for subsequent txn operations");
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session should be tracked after first txn operation");
|
||||
|
||||
assert.commandWorked(session.commitTransaction_forTesting());
|
||||
|
||||
// Verify all documents were inserted
|
||||
assert.eq(5, coll.count({type: "txn"}), "All transaction inserts should succeed");
|
||||
});
|
||||
session.endSession();
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Test 3 transactional operations for the same session use different mongos across transactions", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.sessionTest;
|
||||
const kNumInserts = 5;
|
||||
|
||||
const session = conn.startSession();
|
||||
const sessionDB = session.getDatabase("test");
|
||||
const sessionColl = sessionDB.sessionTest;
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
// Transaction 1
|
||||
withRetryOnTransientTxnError(() => {
|
||||
getNextMongoTracker.reset();
|
||||
|
||||
session.startTransaction();
|
||||
|
||||
// First operation in transaction
|
||||
sessionColl.insert({x: 1, type: "txn"}, {session});
|
||||
|
||||
// Should _getNextMongo once for first operation
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Should call _getNextMongo once for first txn operation");
|
||||
// Session should now be tracked
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session should be tracked after first txn operation");
|
||||
|
||||
// Subsequent operations should not call _getNextMongo (pinned to same mongos)
|
||||
for (let i = 2; i <= kNumInserts; i++) {
|
||||
sessionColl.insert({x: i, type: "txn"}, {session});
|
||||
}
|
||||
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Should not call _getNextMongo for subsequent txn operations");
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session should be tracked after first txn operation");
|
||||
|
||||
assert.commandWorked(session.commitTransaction_forTesting());
|
||||
});
|
||||
|
||||
// Transaction 2
|
||||
withRetryOnTransientTxnError(() => {
|
||||
getNextMongoTracker.reset();
|
||||
|
||||
session.startTransaction();
|
||||
|
||||
// First operation in transaction
|
||||
sessionColl.insert({x: 1, type: "txn"}, {session});
|
||||
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Should call _getNextMongo once for first txn operation");
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session should be tracked after first txn operation");
|
||||
|
||||
// Subsequent operations should not call _getNextMongo (pinned to same mongos)
|
||||
for (let i = 2; i <= kNumInserts; i++) {
|
||||
sessionColl.insert({y: i, type: "txn"}, {session});
|
||||
}
|
||||
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Should not call _getNextMongo for subsequent txn operations");
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session should be tracked after first txn operation");
|
||||
|
||||
assert.commandWorked(session.commitTransaction_forTesting());
|
||||
});
|
||||
|
||||
// Transaction 3
|
||||
withRetryOnTransientTxnError(() => {
|
||||
getNextMongoTracker.reset();
|
||||
|
||||
session.startTransaction();
|
||||
|
||||
// First operation in transaction
|
||||
sessionColl.insert({z: 1, type: "txn"}, {session});
|
||||
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Should call _getNextMongo once for first txn operation");
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session should be tracked after first txn operation");
|
||||
|
||||
// Subsequent operations should not call _getNextMongo (pinned to same mongos)
|
||||
for (let i = 2; i <= kNumInserts; i++) {
|
||||
sessionColl.insert({y: i, type: "txn"}, {session});
|
||||
}
|
||||
|
||||
assert.eq(getNextMongoTracker.count(), 1, "Should not call _getNextMongo for subsequent txn operations");
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session should be tracked after first txn operation");
|
||||
|
||||
assert.commandWorked(session.commitTransaction_forTesting());
|
||||
});
|
||||
|
||||
assert.eq(kNumInserts * 3, coll.count({type: "txn"}), "All transaction inserts should succeed");
|
||||
|
||||
session.endSession();
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Testing session reuse after transaction routes randomly", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.sessionTest;
|
||||
|
||||
const session = conn.startSession();
|
||||
const sessionDB = session.getDatabase("test");
|
||||
const sessionColl = sessionDB.sessionTest;
|
||||
|
||||
// Start and complete a transaction
|
||||
withRetryOnTransientTxnError(() => {
|
||||
session.startTransaction();
|
||||
sessionColl.insert({x: 1, phase: "txn"}, {session});
|
||||
assert.commandWorked(session.commitTransaction_forTesting());
|
||||
});
|
||||
// Session should be tracked
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session should be tracked after transaction");
|
||||
|
||||
// Track _getNextMongo calls for post-transaction operations
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
// Perform non-transactional operations on same session
|
||||
const kOperations = 10;
|
||||
for (let i = 0; i < kOperations; i++) {
|
||||
sessionColl.insert({x: i, phase: "post-txn"}, {session});
|
||||
}
|
||||
|
||||
// Should route randomly (no txnNumber in these operations)
|
||||
assert.gte(getNextMongoTracker.count(), kOperations, "Should route randomly after transaction completes");
|
||||
|
||||
// Session remains tracked but operations without txnNumber don't use the mapping
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session tracking persists but doesn't affect non-txn ops");
|
||||
|
||||
session.endSession();
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Testing multiple concurrent transactions can use different mongos", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const kCollName = "sessionTest";
|
||||
assert.commandWorked(db.createCollection(kCollName));
|
||||
const coll = db.getCollection(kCollName);
|
||||
|
||||
const session1 = conn.startSession();
|
||||
const session2 = conn.startSession();
|
||||
const sessionDB1 = session1.getDatabase("test");
|
||||
const sessionDB2 = session2.getDatabase("test");
|
||||
withRetryOnTransientTxnError(() => {
|
||||
// Start transactions on both sessions
|
||||
session1.startTransaction();
|
||||
session2.startTransaction();
|
||||
|
||||
// Perform operations in both transactions
|
||||
sessionDB1.sessionTest.insert({session: 1, value: "s1"}, {session: session1});
|
||||
sessionDB2.sessionTest.insert({session: 2, value: "s2"}, {session: session2});
|
||||
|
||||
sessionDB1.sessionTest.insert({session: 1, value: "s1-2"}, {session: session1});
|
||||
sessionDB2.sessionTest.insert({session: 2, value: "s2-2"}, {session: session2});
|
||||
|
||||
// Both sessions should be tracked
|
||||
assert.eq(conn._sessionToMongoMap.size(), 2, "Both sessions should be tracked");
|
||||
|
||||
// Commit both transactions
|
||||
assert.commandWorked(session1.commitTransaction_forTesting());
|
||||
assert.commandWorked(session2.commitTransaction_forTesting());
|
||||
});
|
||||
// Verify both transactions succeeded
|
||||
assert.eq(2, coll.count({session: 1}), "Session 1 transaction should succeed");
|
||||
assert.eq(2, coll.count({session: 2}), "Session 2 transaction should succeed");
|
||||
|
||||
session1.endSession();
|
||||
session2.endSession();
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Testing retryable writes pin to same mongos", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.sessionTest;
|
||||
const kOperations = 5;
|
||||
|
||||
const session = conn.startSession({retryWrites: true});
|
||||
const sessionDB = session.getDatabase("test");
|
||||
const sessionColl = sessionDB.sessionTest;
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
// Subsequent operations should not call _getNextMongo (pinned to same mongos)
|
||||
for (let i = 1; i <= kOperations; i++) {
|
||||
sessionColl.insert({x: i, type: "retryable"}, {session});
|
||||
}
|
||||
|
||||
assert.eq(getNextMongoTracker.count(), 5, "Should not call _getNextMongo for subsequent retryable writes");
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "We should have exactly 1 tracked session");
|
||||
|
||||
// Verify all writes succeeded
|
||||
assert.eq(5, coll.count({type: "retryable"}), "All retryable writes should succeed");
|
||||
|
||||
session.endSession();
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
testCase("Testing aborted transaction maintains session tracking", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.sessionTest;
|
||||
|
||||
const session = conn.startSession();
|
||||
const sessionDB = session.getDatabase("test");
|
||||
const sessionColl = sessionDB.sessionTest;
|
||||
|
||||
// Start transaction
|
||||
session.startTransaction();
|
||||
sessionColl.insert({x: 1, type: "aborted"}, {session});
|
||||
|
||||
// Session should be tracked
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session should be tracked in transaction");
|
||||
|
||||
// Abort transaction
|
||||
session.abortTransaction_forTesting();
|
||||
|
||||
// Session remains tracked (mapping persists)
|
||||
assert.eq(conn._sessionToMongoMap.size(), 1, "Session tracking persists after abort");
|
||||
|
||||
// Verify document was not inserted (transaction aborted)
|
||||
assert.eq(0, coll.count({type: "aborted"}), "Aborted transaction should not insert documents");
|
||||
|
||||
session.endSession();
|
||||
coll.drop();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Test auto encryption methods
|
||||
// ============================================================================
|
||||
|
||||
testCase("Testing setAutoEncryption propagates to all mongos connections", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
|
||||
assert.eq(conn.isMultiRouter, true, "Should be MultiRouterMongo");
|
||||
assert.eq(conn._mongoConnections.length, 3, "Should have 3 mongo connections");
|
||||
|
||||
const localKMS = {
|
||||
key: BinData(
|
||||
0,
|
||||
"/tu9jUCBqZdwCelwE/EAm/4WqdxrSMi04B8e9uAV+m30rI1J2nhKZZtQjdvsSCwuI4erR6IEcEK+5eGUAODv43NDNIR9QheT2edWFewUfHKsl9cnzTc86meIzOmYl6dr",
|
||||
),
|
||||
};
|
||||
|
||||
const clientSideFLEOptions = {
|
||||
kmsProviders: {local: localKMS},
|
||||
keyVaultNamespace: "test.keystore",
|
||||
schemaMap: {},
|
||||
};
|
||||
|
||||
// Set auto encryption on the multi-router connection
|
||||
const result = conn.setAutoEncryption(clientSideFLEOptions);
|
||||
assert(result, "setAutoEncryption should succeed");
|
||||
|
||||
// Verify all underlying mongos connections have auto encryption options set
|
||||
conn._mongoConnections.forEach((mongo, idx) => {
|
||||
const options = mongo.getAutoEncryptionOptions();
|
||||
assert.neq(options, undefined, `Mongos ${idx} should have auto encryption options`);
|
||||
assert.eq(options.keyVaultNamespace, "test.keystore", `Mongos ${idx} should have correct keyVaultNamespace`);
|
||||
});
|
||||
|
||||
// Clean up
|
||||
conn.unsetAutoEncryption();
|
||||
// Verify all connections are cleaned up
|
||||
conn._mongoConnections.forEach((mongo, idx) => {
|
||||
const options = mongo.getAutoEncryptionOptions();
|
||||
assert.eq(options, undefined, `Mongos ${idx} should not have auto encryption options after cleanup`);
|
||||
});
|
||||
});
|
||||
|
||||
testCase("Testing toggleAutoEncryption propagates to all mongos connections", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
|
||||
assert.eq(conn.isMultiRouter, true, "Should be MultiRouterMongo");
|
||||
assert.eq(conn._mongoConnections.length, 3, "Should have 3 mongo connections");
|
||||
|
||||
const localKMS = {
|
||||
key: BinData(
|
||||
0,
|
||||
"/tu9jUCBqZdwCelwE/EAm/4WqdxrSMi04B8e9uAV+m30rI1J2nhKZZtQjdvsSCwuI4erR6IEcEK+5eGUAODv43NDNIR9QheT2edWFewUfHKsl9cnzTc86meIzOmYl6dr",
|
||||
),
|
||||
};
|
||||
|
||||
const clientSideFLEOptions = {
|
||||
kmsProviders: {local: localKMS},
|
||||
keyVaultNamespace: "test.keystore",
|
||||
schemaMap: {},
|
||||
};
|
||||
|
||||
// Set auto encryption first
|
||||
assert(conn.setAutoEncryption(clientSideFLEOptions), "setAutoEncryption should succeed");
|
||||
|
||||
// Toggle auto encryption on
|
||||
const toggleOnResult = conn.toggleAutoEncryption(true);
|
||||
assert(toggleOnResult, "toggleAutoEncryption(true) should succeed");
|
||||
|
||||
// Verify all underlying mongos connections have auto encryption toggled on
|
||||
conn._mongoConnections.forEach((mongo, idx) => {
|
||||
// We can't directly check the toggle state, but we can verify options exist
|
||||
const options = mongo.getAutoEncryptionOptions();
|
||||
assert.neq(options, undefined, `Mongos ${idx} should have auto encryption options after toggle on`);
|
||||
});
|
||||
|
||||
// Toggle auto encryption off
|
||||
const toggleOffResult = conn.toggleAutoEncryption(false);
|
||||
assert(toggleOffResult, "toggleAutoEncryption(false) should succeed");
|
||||
|
||||
// Verify all underlying mongos connections have auto encryption toggled off
|
||||
conn._mongoConnections.forEach((mongo, idx) => {
|
||||
const options = mongo.getAutoEncryptionOptions();
|
||||
assert.neq(options, undefined, `Mongos ${idx} should still have auto encryption options after toggle off`);
|
||||
});
|
||||
|
||||
// Clean up by unsetting auto encryption
|
||||
conn.unsetAutoEncryption();
|
||||
});
|
||||
|
||||
testCase("Testing encryption state is independently maintained per mongos", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
|
||||
assert.eq(conn.isMultiRouter, true, "Should be MultiRouterMongo");
|
||||
assert.eq(conn._mongoConnections.length, 3, "Should have 3 mongo connections");
|
||||
|
||||
const localKMS = {
|
||||
key: BinData(
|
||||
0,
|
||||
"/tu9jUCBqZdwCelwE/EAm/4WqdxrSMi04B8e9uAV+m30rI1J2nhKZZtQjdvsSCwuI4erR6IEcEK+5eGUAODv43NDNIR9QheT2edWFewUfHKsl9cnzTc86meIzOmYl6dr",
|
||||
),
|
||||
};
|
||||
|
||||
const clientSideFLEOptions = {
|
||||
kmsProviders: {local: localKMS},
|
||||
keyVaultNamespace: "test.keystore",
|
||||
schemaMap: {},
|
||||
};
|
||||
|
||||
// Set and toggle auto encryption multiple times
|
||||
assert(conn.setAutoEncryption(clientSideFLEOptions), "setAutoEncryption should succeed");
|
||||
assert(conn.toggleAutoEncryption(true), "toggleAutoEncryption(true) should succeed");
|
||||
assert(conn.toggleAutoEncryption(false), "toggleAutoEncryption(false) should succeed");
|
||||
assert(conn.toggleAutoEncryption(true), "toggleAutoEncryption(true) should succeed again");
|
||||
|
||||
// Verify all connections maintain consistent state
|
||||
conn._mongoConnections.forEach((mongo, idx) => {
|
||||
const options = mongo.getAutoEncryptionOptions();
|
||||
assert.neq(options, undefined, `Mongos ${idx} should have auto encryption options`);
|
||||
assert.eq(options.keyVaultNamespace, "test.keystore", `Mongos ${idx} should have correct keyVaultNamespace`);
|
||||
});
|
||||
|
||||
// Clean up
|
||||
conn.unsetAutoEncryption();
|
||||
|
||||
// Verify all connections are cleaned up
|
||||
conn._mongoConnections.forEach((mongo, idx) => {
|
||||
const options = mongo.getAutoEncryptionOptions();
|
||||
assert.eq(options, undefined, `Mongos ${idx} should not have auto encryption options after cleanup`);
|
||||
});
|
||||
});
|
||||
|
||||
testCase("Testing encrypted inserts are routed randomly", () => {
|
||||
const uri = getMongosesURI();
|
||||
const conn = connect(uri).getMongo();
|
||||
const db = conn.getDB("test");
|
||||
const coll = db.encryptionTest;
|
||||
const kOperations = 10;
|
||||
|
||||
const localKMS = {
|
||||
key: BinData(
|
||||
0,
|
||||
"/tu9jUCBqZdwCelwE/EAm/4WqdxrSMi04B8e9uAV+m30rI1J2nhKZZtQjdvsSCwuI4erR6IEcEK+5eGUAODv43NDNIR9QheT2edWFewUfHKsl9cnzTc86meIzOmYl6dr",
|
||||
),
|
||||
};
|
||||
|
||||
const clientSideFLEOptions = {
|
||||
kmsProviders: {local: localKMS},
|
||||
keyVaultNamespace: "test.keystore",
|
||||
schemaMap: {},
|
||||
};
|
||||
|
||||
// Set auto encryption
|
||||
assert(conn.setAutoEncryption(clientSideFLEOptions), "setAutoEncryption should succeed");
|
||||
assert(conn.toggleAutoEncryption(true), "toggleAutoEncryption(true) should succeed");
|
||||
|
||||
// Track _getNextMongo calls
|
||||
const getNextMongoTracker = overrideGetNextMongoCountingCalls(conn);
|
||||
|
||||
// Perform encrypted inserts
|
||||
for (let i = 0; i < kOperations; i++) {
|
||||
coll.insert({x: i, type: "encrypted-insert"});
|
||||
}
|
||||
|
||||
// Should route randomly for all encrypted insert operations
|
||||
assert.gte(getNextMongoTracker.count(), kOperations, "Should route randomly for encrypted insert operations");
|
||||
|
||||
// Verify all documents were inserted
|
||||
assert.eq(kOperations, coll.count({type: "encrypted-insert"}), "All encrypted inserts should succeed");
|
||||
|
||||
// Clean up
|
||||
coll.drop();
|
||||
conn.unsetAutoEncryption();
|
||||
});
|
||||
|
||||
st.stop();
|
||||
|
|
@ -359,6 +359,8 @@ extern const JSFile collection;
|
|||
extern const JSFile crud_api;
|
||||
extern const JSFile db;
|
||||
extern const JSFile db_global;
|
||||
extern const JSFile multi_router;
|
||||
extern const JSFile multi_router_global;
|
||||
extern const JSFile error_codes;
|
||||
extern const JSFile explain_query;
|
||||
extern const JSFile explain_query_global;
|
||||
|
|
@ -390,6 +392,7 @@ void Scope::execCoreFiles() {
|
|||
execSetup(JSFiles::utils);
|
||||
execSetup(JSFiles::utils_auth);
|
||||
execSetup(JSFiles::utils_sh);
|
||||
execSetup(JSFiles::multi_router);
|
||||
|
||||
// globals
|
||||
execSetup(JSFiles::bulk_api_global);
|
||||
|
|
@ -402,6 +405,7 @@ void Scope::execCoreFiles() {
|
|||
execSetup(JSFiles::utils_global);
|
||||
execSetup(JSFiles::utils_auth_global);
|
||||
execSetup(JSFiles::utils_sh_global);
|
||||
execSetup(JSFiles::multi_router_global);
|
||||
|
||||
// scripts
|
||||
execSetup(JSFiles::mongo);
|
||||
|
|
|
|||
|
|
@ -255,6 +255,8 @@ MONGOJS_CPP_JSFILES = [
|
|||
"explainable.js",
|
||||
"explainable_global.js",
|
||||
"mongo.js",
|
||||
"multi_router.js",
|
||||
"multi_router_global.js",
|
||||
"prelude.js",
|
||||
"query.js",
|
||||
"query_global.js",
|
||||
|
|
|
|||
|
|
@ -18,3 +18,6 @@ filters:
|
|||
- "feature_compatibility_version.js":
|
||||
approvers:
|
||||
- 10gen/server-fcv
|
||||
- "*multi_router*.js":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing-routing-and-topology
|
||||
|
|
|
|||
|
|
@ -322,6 +322,35 @@ Mongo.prototype.getReadConcern = function () {
|
|||
return this._readConcernLevel;
|
||||
};
|
||||
|
||||
// Selects whether to use a Mongo or a MultiRouterMongo based on the deployment
|
||||
// The connection can either be:
|
||||
// - A list of mongos for sharded cluster deployment
|
||||
// - A list of primary/secondary nodes for replica sets
|
||||
// - 1 node as a standalone deployment
|
||||
// - A list of end points where some might not have a listening server.
|
||||
// This factory attempts to catch cluster fixtures and define the right connections type.
|
||||
function connectionFactory(uri, apiParameters) {
|
||||
const mongouri = new MongoURI(uri);
|
||||
// For one end point we always fall back to Mongo.
|
||||
if (mongouri.servers.length < 2) {
|
||||
return new Mongo(uri, undefined /* encryptedDBClientCallback */, apiParameters);
|
||||
}
|
||||
// In case of multiple connection, check if setName is not "". This implies a replica-set connection.
|
||||
if (mongouri.setName.length > 0) {
|
||||
return new Mongo(uri, undefined /* encryptedDBClientCallback */, apiParameters);
|
||||
} else {
|
||||
// The Multi-Router Mongo doesn't accept that any of the listed url can't connect.
|
||||
// On the other hand, the Mongo object connects with the first available end point.
|
||||
// Some tests provide some fake uris to tests the Mongo object.
|
||||
// This try-catch attempts to catch this case and falls back to the original implementation.
|
||||
try {
|
||||
return new MultiRouterMongo(uri, undefined /* encryptedDBClientCallback */, apiParameters);
|
||||
} catch (e) {
|
||||
return new Mongo(uri, undefined /* encryptedDBClientCallback */, apiParameters);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
globalThis.connect = function (url, user, pass, apiParameters) {
|
||||
if (url instanceof MongoURI) {
|
||||
user = url.user;
|
||||
|
|
@ -375,7 +404,7 @@ globalThis.connect = function (url, user, pass, apiParameters) {
|
|||
chatty("connecting to: " + safeURL);
|
||||
let m;
|
||||
try {
|
||||
m = new Mongo(url, undefined /* encryptedDBClientCallback */, apiParameters);
|
||||
m = connectionFactory(url, apiParameters);
|
||||
} catch (e) {
|
||||
let dest;
|
||||
if (url.indexOf(".query.mongodb.net") != -1) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,460 @@
|
|||
/**
|
||||
* SessionMongoMap - Maps session IDs to Mongo connections.
|
||||
* Handles UUID-to-string conversion for proper Map lookup
|
||||
*
|
||||
* The class ensures we have only one pinned mongo per session ID.
|
||||
* The class is meant to pin a mongo for a given multi document transaction.
|
||||
* The logic is optimized so that a new txnNumber will erase the previous entry
|
||||
* as we can assume the previous transaction has terminated.
|
||||
*/
|
||||
class SessionMongoMap {
|
||||
constructor() {
|
||||
this._map = new Map();
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a session ID to a string key for the Map
|
||||
* @param {Object} sessionId - Session ID object with structure {id: UUID(...)}
|
||||
* @returns {string} - Hex string representation of the UUID
|
||||
*/
|
||||
_toKey(sessionId) {
|
||||
if (!sessionId || !sessionId.id) {
|
||||
throw new Error("Invalid sessionId: " + tojson(sessionId));
|
||||
}
|
||||
return sessionId.id.hex();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a session ID -> Mongo connection mapping.
|
||||
* If session ID already exists it will erase it first.
|
||||
* @param {Object} sessionId - Session ID object with structure {id: UUID(...)}
|
||||
* @param {Mongo} mongo - Mongo connection instance
|
||||
*/
|
||||
set(sessionId, txnNumber, mongo) {
|
||||
const key = this._toKey(sessionId);
|
||||
if (this.has(sessionId)) {
|
||||
this.delete(sessionId);
|
||||
}
|
||||
this._map.set(key, {txnNumber, mongo});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the Mongo connection for a session ID and txnNumber
|
||||
* @param {Object} sessionId - Session ID object with structure {id: UUID(...)}
|
||||
* @returns {Mongo|undefined} - Mongo connection or undefined if not found
|
||||
*/
|
||||
get(sessionId, txnNumber) {
|
||||
const key = this._toKey(sessionId);
|
||||
const value = this._map.get(key);
|
||||
// If the txnNumber changed we are in a new transaction and we should return a new mongo.
|
||||
if (value && Number(value.txnNumber) === Number(txnNumber)) {
|
||||
return value.mongo;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a session ID mapping
|
||||
* @param {Object} sessionId - Session ID object with structure {id: UUID(...)}
|
||||
*/
|
||||
delete(sessionId) {
|
||||
const key = this._toKey(sessionId);
|
||||
return this._map.delete(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a session ID is tracked
|
||||
* @param {Object} sessionId - Session ID object with structure {id: UUID(...)}
|
||||
* @returns {boolean} - True if the session is tracked
|
||||
*/
|
||||
has(sessionId) {
|
||||
const key = this._toKey(sessionId);
|
||||
return this._map.has(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of tracked sessions
|
||||
* @returns {number} - Number of sessions in the map
|
||||
*/
|
||||
size() {
|
||||
return this._map.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all session mappings
|
||||
*/
|
||||
clear() {
|
||||
this._map.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get string representation for debugging
|
||||
* @returns {string}
|
||||
*/
|
||||
toString() {
|
||||
const entries = Array.from(this._map.entries())
|
||||
.map(([key, mongo]) => `${key}: ${mongo.host}`)
|
||||
.join(", ");
|
||||
return `SessionMongoMap(${this._map.size} entries) { ${entries} }`;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* CursorTracker - Maps cursor IDs to Mongo connections.
|
||||
*
|
||||
* This ensures that getMore commands are routed to the same mongos that initiated the cursor.
|
||||
*
|
||||
* Uses a plain JavaScript object instead of a Map. When a NumberLong is used as a
|
||||
* property key on a plain object, JavaScript automatically converts it to a string via toString().
|
||||
* This gives us value-based comparison (e.g., "NumberLong(123)") rather than reference-based
|
||||
* comparison that Map would use.
|
||||
*/
|
||||
class CursorTracker {
|
||||
kNoCursor = new NumberLong(0);
|
||||
|
||||
connectionsByCursorId = {};
|
||||
|
||||
getConnectionUsedForCursor(cursorId) {
|
||||
return cursorId instanceof NumberLong ? this.connectionsByCursorId[cursorId] : undefined;
|
||||
}
|
||||
|
||||
setConnectionUsedForCursor(cursorId, cursorConn) {
|
||||
// Skip if it's the "no cursor" sentinel value
|
||||
if (cursorId instanceof NumberLong && !bsonBinaryEqual({_: cursorId}, {_: this.kNoCursor})) {
|
||||
this.connectionsByCursorId[cursorId] = cursorConn;
|
||||
}
|
||||
}
|
||||
|
||||
count() {
|
||||
return Object.keys(this.connectionsByCursorId).length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a MongoDB URI object into an array of individual URIs, one for each server.
|
||||
*
|
||||
* @param {Object} mongouri - MongoDB URI object with servers, database, and options
|
||||
* @returns {Array<string>} Array of MongoDB connection URIs
|
||||
* [
|
||||
* "mongodb://localhost:20044/test?compressors=disabled",
|
||||
* "mongodb://localhost:20045/test?compressors=disabled"
|
||||
* ...
|
||||
* ]
|
||||
*/
|
||||
function toConnectionsList(mongouri) {
|
||||
const optionsString =
|
||||
Object.keys(mongouri.options).length > 0
|
||||
? "?" +
|
||||
Object.entries(mongouri.options)
|
||||
.map(([key, val]) => `${key}=${val}`)
|
||||
.join("&")
|
||||
: "";
|
||||
|
||||
return mongouri.servers.map((s) => `mongodb://${s.server}/${mongouri.database}${optionsString}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* MultiRouterMongo
|
||||
*
|
||||
* This class holds a connection pool of mongoses and dispatches commands randomly against it.
|
||||
* It ensures that stateful operations (transactions, cursors) are pinned to the same mongos
|
||||
* while allowing stateless operations to be distributed randomly.
|
||||
*
|
||||
* Signature matches the Mongo constructor: (url, encryptedDBClientCallback, apiParameters)
|
||||
*/
|
||||
function MultiRouterMongo(uri, encryptedDBClientCallback, apiParameters) {
|
||||
const mongoURI = new MongoURI(uri);
|
||||
|
||||
if (mongoURI.servers.length === 0) {
|
||||
throw Error("No mongos hosts found in connection string");
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Logging
|
||||
// ============================================================================
|
||||
|
||||
this._name = "multi_router_number_" + Math.floor(Math.random() * 100000);
|
||||
|
||||
this.log = function (text) {
|
||||
chatty("[" + this._name + "] " + text);
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Connection pool and primary connection
|
||||
// ============================================================================
|
||||
|
||||
const individualURIs = toConnectionsList(mongoURI);
|
||||
this._mongoConnections = individualURIs.map((uri) => {
|
||||
return new Mongo(uri, encryptedDBClientCallback, apiParameters);
|
||||
});
|
||||
|
||||
// The primary mongo is a pinned connection that the proxy falls back on when
|
||||
// the workload should not be distributed. This is useful for:
|
||||
// - Tests that require dispatching to be disabled
|
||||
// - Executing getters or setters on options
|
||||
// - Executing getters or setters on the cluster logicalTime
|
||||
// The primary mongo acts as the holder of shared state among the connection pool.
|
||||
this.primaryMongo = this._mongoConnections[0];
|
||||
this.defaultDB = this.primaryMongo.defaultDB;
|
||||
this.isMultiRouter = true;
|
||||
this.host = mongoURI.servers.map((s) => s.server).join(",");
|
||||
|
||||
this.log("Established a Multi-Router Mongo connector. Mongos connections list: " + individualURIs);
|
||||
|
||||
// ============================================================================
|
||||
// State tracking (cursors and sessions)
|
||||
// ============================================================================
|
||||
|
||||
this._cursorTracker = new CursorTracker();
|
||||
this._sessionToMongoMap = new SessionMongoMap(this._name);
|
||||
|
||||
// ============================================================================
|
||||
// Field Level Encryption (FLE) methods
|
||||
// ============================================================================
|
||||
|
||||
this.setAutoEncryption = function (fleOptions) {
|
||||
let res;
|
||||
this._mongoConnections.forEach((mongo) => {
|
||||
res = mongo.setAutoEncryption(fleOptions);
|
||||
if (!res) {
|
||||
return res;
|
||||
}
|
||||
});
|
||||
return res;
|
||||
};
|
||||
|
||||
this.toggleAutoEncryption = function (flag) {
|
||||
let res;
|
||||
this._mongoConnections.forEach((mongo) => {
|
||||
res = mongo.toggleAutoEncryption(flag);
|
||||
if (!res) {
|
||||
return res;
|
||||
}
|
||||
});
|
||||
return res;
|
||||
};
|
||||
|
||||
this.unsetAutoEncryption = function () {
|
||||
let res;
|
||||
this._mongoConnections.forEach((mongo) => {
|
||||
res = mongo.unsetAutoEncryption();
|
||||
if (!res) {
|
||||
return res;
|
||||
}
|
||||
});
|
||||
return res;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Command routing (core routing logic)
|
||||
// ============================================================================
|
||||
|
||||
// Selects a random mongo from the connection pool
|
||||
this._getNextMongo = function () {
|
||||
let normalizedRandValue = Math.random();
|
||||
let randomIndex = Math.floor(normalizedRandValue * this._mongoConnections.length);
|
||||
return this._mongoConnections[randomIndex];
|
||||
};
|
||||
|
||||
this.runCommand = function (dbname, cmd, options) {
|
||||
let mongo;
|
||||
|
||||
// Multi-document transactions must use the same mongos
|
||||
// Extract the first connection randomly and pin it for subsequent use
|
||||
if (cmd && cmd.lsid && cmd.txnNumber) {
|
||||
mongo = this._sessionToMongoMap.get(cmd.lsid, cmd.txnNumber);
|
||||
if (!mongo) {
|
||||
let sessionInfo = {sessionId: cmd.lsid, txnNuber: cmd.txnNumber};
|
||||
this.log("Found no mongo for the multi-document transaction: " + tojson(sessionInfo));
|
||||
mongo = this._getNextMongo();
|
||||
// This will erase the previous entry for the same session id but different txnNumber
|
||||
this._sessionToMongoMap.set(cmd.lsid, cmd.txnNumber, mongo);
|
||||
}
|
||||
}
|
||||
|
||||
// getMore commands must use the same mongos that initiated the cursor
|
||||
if (cmd && cmd.getMore) {
|
||||
mongo = this._cursorTracker.getConnectionUsedForCursor(cmd.getMore);
|
||||
if (!mongo) {
|
||||
throw new Error("Found no mongo for getMore, but we should");
|
||||
}
|
||||
}
|
||||
|
||||
// If no mongo has been selected yet, pick one randomly
|
||||
if (!mongo) {
|
||||
mongo = this._getNextMongo();
|
||||
}
|
||||
|
||||
const result = mongo.runCommand(dbname, cmd, options);
|
||||
|
||||
// Track cursor-to-mongos mapping for aggregations and finds
|
||||
// After extracting the first connection randomly, we pin it for subsequent getMore commands
|
||||
if (result && result.cursor && result.cursor.id && !cmd.getMore) {
|
||||
this._cursorTracker.setConnectionUsedForCursor(result.cursor.id, mongo);
|
||||
}
|
||||
|
||||
return result;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Session management
|
||||
// ============================================================================
|
||||
|
||||
this.startSession = function (options = {}, proxy) {
|
||||
if (!options.hasOwnProperty("retryWrites") && this.primaryMongo.hasOwnProperty("_retryWrites")) {
|
||||
options.retryWrites = this.primaryMongo._retryWrites;
|
||||
}
|
||||
|
||||
const newDriverSession = new DriverSession(proxy, options);
|
||||
|
||||
if (typeof TestData === "object" && TestData.testName) {
|
||||
print(
|
||||
"New session started with sessionID: " +
|
||||
tojsononeline(newDriverSession.getSessionId()) +
|
||||
" and options: " +
|
||||
tojsononeline(options),
|
||||
);
|
||||
}
|
||||
|
||||
return newDriverSession;
|
||||
};
|
||||
|
||||
this._getDefaultSession = function (proxy) {
|
||||
if (!this.hasOwnProperty("_defaultSession")) {
|
||||
if (_shouldUseImplicitSessions()) {
|
||||
try {
|
||||
this._defaultSession = this.startSession({causalConsistency: false}, proxy);
|
||||
} catch (e) {
|
||||
if (e instanceof DriverSession.UnsupportedError) {
|
||||
chatty("WARNING: No implicit session: " + e.message);
|
||||
this._defaultSession = new _DummyDriverSession(proxy);
|
||||
} else {
|
||||
print("ERROR: Implicit session failed: " + e.message);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
this._defaultSession = new _DummyDriverSession(proxy);
|
||||
}
|
||||
this._defaultSession._isExplicit = false;
|
||||
}
|
||||
return this._defaultSession;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Database and admin operations
|
||||
// ============================================================================
|
||||
|
||||
this.getDB = function (name, proxy) {
|
||||
return new DB(proxy, name);
|
||||
};
|
||||
|
||||
this.adminCommand = function (cmd, proxy) {
|
||||
return new DB(proxy, "admin").runCommand(cmd);
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Authentication methods
|
||||
// ============================================================================
|
||||
|
||||
this.logout = function (dbname) {
|
||||
let result;
|
||||
for (const mongo of this._mongoConnections) {
|
||||
result = mongo.logout(dbname);
|
||||
if (!result.ok) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
||||
this.auth = function (params) {
|
||||
let result;
|
||||
for (const mongo of this._mongoConnections) {
|
||||
result = mongo.auth(params);
|
||||
if (!result) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Utility methods
|
||||
// ============================================================================
|
||||
|
||||
this.isConnectedToMongos = function () {
|
||||
// Assert the primary Mongo is connected to a mongos
|
||||
const res = assert.commandWorked(this.primaryMongo._getDefaultSession().getClient().adminCommand("ismaster"));
|
||||
return "isdbgrid" === res.msg;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Proxy handler
|
||||
// ============================================================================
|
||||
|
||||
return new Proxy(this, {
|
||||
get(target, prop, proxy) {
|
||||
// If the proxy is disabled by the test, always run the command on the pinned mongos (primary mongo)
|
||||
if (jsTest.options().skipMultiRouterRotation) {
|
||||
const value = target.primaryMongo[prop];
|
||||
if (typeof value === "function") {
|
||||
return value.bind(target.primaryMongo);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
if (prop === "_runCommandImpl") {
|
||||
throw new Error("You should never run _runCommandImpl against the proxy but always against a Mongo!");
|
||||
}
|
||||
|
||||
if (prop === "adminCommand") {
|
||||
return function (cmd) {
|
||||
return target.adminCommand(cmd, proxy);
|
||||
};
|
||||
}
|
||||
|
||||
if (prop === "getMongo") {
|
||||
return function () {
|
||||
return proxy;
|
||||
};
|
||||
}
|
||||
|
||||
if (prop === "getDB") {
|
||||
return function (name) {
|
||||
return target.getDB(name, proxy);
|
||||
};
|
||||
}
|
||||
|
||||
if (prop === "_getDefaultSession") {
|
||||
return function () {
|
||||
return target._getDefaultSession(proxy);
|
||||
};
|
||||
}
|
||||
|
||||
if (prop === "startSession") {
|
||||
return function (options) {
|
||||
return target.startSession(options, proxy);
|
||||
};
|
||||
}
|
||||
|
||||
if (target.hasOwnProperty(prop)) {
|
||||
return target[prop];
|
||||
}
|
||||
|
||||
// For every un-implemented property, run on primary mongo
|
||||
const value = target.primaryMongo[prop];
|
||||
if (typeof value === "function") {
|
||||
return value.bind(target.primaryMongo);
|
||||
}
|
||||
return value;
|
||||
},
|
||||
|
||||
set(target, prop, value, receiver) {
|
||||
target[prop] = value;
|
||||
return true;
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export {MultiRouterMongo, toConnectionsList};
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
import {MultiRouterMongo, toConnectionsList} from "src/mongo/shell/multi_router.js";
|
||||
globalThis.MultiRouterMongo = MultiRouterMongo;
|
||||
globalThis.toConnectionsList = toConnectionsList;
|
||||
|
|
@ -457,6 +457,7 @@ function jsTestOptions() {
|
|||
skipCheckOrphans: TestData.skipCheckOrphans || false,
|
||||
skipCheckRoutingTableConsistency: TestData.skipCheckRoutingTableConsistency || false,
|
||||
skipCheckShardFilteringMetadata: TestData.skipCheckShardFilteringMetadata || false,
|
||||
skipMultiRouterRotation: TestData.skipMultiRouterRotation || false,
|
||||
inEvergreen: TestData.inEvergreen || false,
|
||||
defaultReadPreference: TestData.defaultReadPreference,
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue