import {Cluster} from "jstests/concurrency/fsm_libs/cluster.js"; import {parseConfig} from "jstests/concurrency/fsm_libs/parse_config.js"; import {SpecificSecondaryReaderMongo} from "jstests/libs/specific_secondary_reader_mongo.js"; const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor; export const workerThread = (function () { // workloads = list of workload filenames // args.tid = the thread identifier // args.tenantId = the tenant id // args.data = map of workload -> 'this' parameter passed to the FSM state functions // args.host = the address to make a new connection to // args.latch = CountDownLatch instance for starting all threads // args.dbName = the database name // args.collName = the collection name // args.cluster = connection strings for all cluster nodes (see cluster.js for format) // args.clusterOptions = the configuration of the cluster // args.seed = seed for the random number generator // args.errorLatch = CountDownLatch instance that threads count down when they error // args.sessionOptions = the options to start a session with // run = callback that takes a map of workloads to their associated $config async function main(workloads, args, run) { let myDB; let configs = {}; let connectionString = "mongodb://" + args.host + "/?appName=tid:" + args.tid; if (typeof args.replSetName !== "undefined") { connectionString += "&replicaSet=" + args.replSetName; } // The global 'TestData' object may still be undefined if the concurrency suite isn't being // run by resmoke.py (e.g. if it is being run via a parallel shell in the backup/restore // tests). TestData = TestData !== undefined ? TestData : {}; try { // Running a callback passed through testData before running fsm worker threads. // Can be added to yml files as the following example: // fsmPreOverridesLoadedCallback: ' // testingReplication = true; // await import("jstests/libs/override_methods/network_error_and_txn_override.js"); // ... // ' if (TestData.fsmPreOverridesLoadedCallback) { if (typeof TestData.fsmPreOverridesLoadedCallback !== "string") { throw new Error( `TestData.fsmPreOverridesLoadedCallback must be a string, not ${typeof TestData.fsmPreOverridesLoadedCallback}`, ); } const fn = new AsyncFunction(TestData.fsmPreOverridesLoadedCallback); await fn(); } if (typeof db !== "undefined") { // The implicit database connection created within the thread's scope // is unneeded, so forcibly clean it up. globalThis.db = undefined; gc(); } let mongo; if (TestData.pinningSecondary) { mongo = new SpecificSecondaryReaderMongo(connectionString, args.secondaryHost); } else { mongo = new Mongo(connectionString); } if (typeof args.tenantId !== "undefined") { TestData.tenantId = args.tenantId; await import("jstests/libs/override_methods/simulate_atlas_proxy.js"); } // Retry operations that fail due to in-progress background operations. Load this early // so that later overrides can be retried. await import("jstests/libs/override_methods/index_builds/implicitly_retry_on_background_op_in_progress.js"); if (typeof args.sessionOptions !== "undefined") { let initialClusterTime; let initialOperationTime; // JavaScript objects backed by C++ objects (e.g. BSON values from a command // response) do not serialize correctly when passed through the Thread // constructor. To work around this behavior, we instead pass a stringified form // of the JavaScript object through the Thread constructor and use eval() // to rehydrate it. if (typeof args.sessionOptions.initialClusterTime === "string") { initialClusterTime = eval("(" + args.sessionOptions.initialClusterTime + ")"); // The initialClusterTime property was removed from SessionOptions in a // later revision of the Driver's specification, so we remove the property // and call advanceClusterTime() ourselves. delete args.sessionOptions.initialClusterTime; } if (typeof args.sessionOptions.initialOperationTime === "string") { initialOperationTime = eval("(" + args.sessionOptions.initialOperationTime + ")"); // The initialOperationTime property was removed from SessionOptions in a // later revision of the Driver's specification, so we remove the property // and call advanceOperationTime() ourselves. delete args.sessionOptions.initialOperationTime; } const session = mongo.startSession(args.sessionOptions); const readPreference = session.getOptions().getReadPreference(); if (readPreference && readPreference.mode === "secondary") { // Unset the explicit read preference so set_read_preference_secondary.js can do // the right thing based on the DB. session.getOptions().setReadPreference(undefined); // We import set_read_preference_secondary.js in order to avoid running // commands against the "admin" and "config" databases via mongos with // readPreference={mode: "secondary"} when there's only a single node in // the CSRS. await import("jstests/libs/override_methods/set_read_preference_secondary.js"); } if (typeof initialClusterTime !== "undefined") { session.advanceClusterTime(initialClusterTime); } if (typeof initialOperationTime !== "undefined") { session.advanceOperationTime(initialOperationTime); } myDB = session.getDatabase(args.dbName); } else { myDB = mongo.getDB(args.dbName); } { let connectionDesc = ""; // In sharded environments, mongos is acting as a proxy for the mongo shell and // therefore has a different outbound port than the 'whatsmyuri' command returns. if (!Cluster.isSharded(args.clusterOptions) && !TestData.testingReplicaSetEndpoint) { let res = assert.commandWorked(myDB.runCommand({whatsmyuri: 1})); const myUri = res.you; res = assert.commandWorked(myDB.adminCommand({currentOp: 1, client: myUri})); connectionDesc = ", conn:" + res.inprog[0].desc; } const printOriginal = print; print = function () { const printArgs = Array.from(arguments); const prefix = "[tid:" + args.tid + connectionDesc + "]"; printArgs.unshift(prefix); return printOriginal(printArgs.join(" ")); }; jsTestLog = function (msg) { if (typeof msg === "object") { msg = tojson(msg); } assert.eq(typeof msg, "string", "Received: " + msg); let msgs = msg.split("\n"); msgs = msgs.map((msg) => "[tid:" + args.tid + "] " + msg); const printMsgs = ["----", ...msgs, "----"].map((s) => `[jsTest] ${s}`); printOriginal(`\n\n${printMsgs.join("\n")}\n\n`); }; } if (TestData.shardsAddedRemoved) { await import("jstests/libs/override_methods/implicitly_retry_on_shard_transition_errors.js"); } if (TestData.runningWithConfigStepdowns || TestData.runningWithShardStepdowns || TestData.killShards) { await import("jstests/libs/override_methods/implicitly_retry_crud_on_no_progress_made.js"); } if (TestData.runningWithBalancer && !TestData.shardsAddedRemoved) { // Skipping the import on shard transitions because it's already imported under // implicitly_retry_on_shard_transition_errors.js await import("jstests/libs/override_methods/implicitly_retry_on_migration_in_progress.js"); } if (TestData.fuzzRuntimeParams) { await import( "jstests/libs/override_methods/implicitly_retry_on_conflicting_operation_during_fuzztest.js" ); } if (Cluster.isReplication(args.clusterOptions)) { if ( args.clusterOptions.hasOwnProperty("sharded") && args.clusterOptions.sharded.hasOwnProperty("stepdownOptions") && args.clusterOptions.sharded.stepdownOptions.shardStepdown ) { const newOptions = { alwaysInjectTransactionNumber: true, defaultReadConcernLevel: "majority", logRetryAttempts: true, overrideRetryAttempts: 3, }; Object.assign(TestData, newOptions); assert(!TestData.hasOwnProperty("networkErrorAndTxnOverrideConfig"), TestData); TestData.networkErrorAndTxnOverrideConfig = {retryOnNetworkErrors: true}; await import("jstests/libs/override_methods/network_error_and_txn_override.js"); if (TestData.killShards) { // TODO (SERVER-107404): Remove the override below. await import("jstests/libs/override_methods/implicitly_retry_resharding.js"); } } // Operations that run after a "dropDatabase" command has been issued may fail with // a "DatabaseDropPending" error response if they would create a new collection on // that database while we're waiting for a majority of nodes in the replica set to // confirm it has been dropped. We load the // implicitly_retry_on_database_drop_pending.js file to make it so that the clients // started by the concurrency framework automatically retry their operation in the // face of this particular error response. await import("jstests/libs/override_methods/implicitly_retry_on_database_drop_pending.js"); } if (TestData.defaultReadConcernLevel || TestData.defaultWriteConcern) { await import("jstests/libs/override_methods/set_read_and_write_concerns.js"); } for (const workload of workloads) { const {$config} = await import(workload); let config = parseConfig($config); // to normalize // Copy any modifications that were made to $config.data // during the setup function of the workload (see caveat // below). // XXX: Changing the order of extend calls causes problems // for workloads that reference $super. // Suppose you have workloads A and B, where workload B extends // workload A. The $config.data of workload B can define a // function that closes over the $config object of workload A // (known as $super to workload B). This reference is lost when // the config object is serialized to BSON, which results in // undefined variables in the derived workload. let data = Object.extend({}, args.data[workload], true); data = Object.extend(data, config.data, true); // Object.extend() defines all properties added to the destination object as // configurable, enumerable, and writable. To prevent workloads from changing // the iterations and threadCount properties in their state functions, we // redefine them here as non-configurable and non-writable. Object.defineProperties(data, { "iterations": {configurable: false, writable: false, value: data.iterations}, "threadCount": {configurable: false, writable: false, value: data.threadCount}, }); data.tid = args.tid; configs[workload] = { data: data, db: myDB, collName: args.collName, cluster: args.cluster, iterations: data.iterations, passConnectionCache: config.passConnectionCache, startState: config.startState, states: config.states, tid: args.tid, transitions: config.transitions, errorLatch: args.errorLatch, numThreads: args.numThreads, }; } args.latch.countDown(); // Converts any exceptions to a return status. In order for the // parent thread to call countDown() on our behalf, we must throw // an exception. Nothing prior to (and including) args.latch.countDown() // should be wrapped in a try/catch statement. try { args.latch.await(); // wait for all threads to start Random.setRandomSeed(args.seed); await run(configs); return {ok: 1}; } catch (e) { jsTest.log("Thread failed. Other threads will stop execution on next iteration."); args.errorLatch.countDown(); return { ok: 0, err: e.toString(), stack: e.stack, tid: args.tid, workloads: workloads, }; } } finally { // Kill this worker thread's session to ensure any possible idle cursors left open by // the workload are closed. // TODO SERVER-74993: Remove this. try { let session = myDB.getSession(); if (session) { myDB.runCommand({killSessions: [session.getSessionId()]}); } } catch (e) { // Ignore errors from killSessions. jsTest.log("Error running killSessions: " + e); } // Avoid retention of connection object configs = null; myDB = null; gc(); } } return {main: main}; })();