mongo/jstests/noPassthrough/query/exchange_in_session.js

93 lines
2.6 KiB
JavaScript

/**
* Be sure that an exchange won't deadlock when one of the consumer's buffers is full. Iterates two
* consumers on an Exchange with a very small buffer. This test was designed to reproduce
* SERVER-37499.
* @tags: [
* requires_sharding,
* uses_transactions,
* ]
*/
import {ShardingTest} from "jstests/libs/shardingtest.js";
// This test manually simulates a session, which is not compatible with implicit sessions.
TestData.disableImplicitSessions = true;
// Start a sharded cluster. For this test, we'll just need to talk to the shard directly.
const st = new ShardingTest({shards: 1, mongos: 1});
const adminDB = st.shard0.getDB("admin");
const session = st.shard0.getDB("test").getMongo().startSession();
const shardDB = session.getDatabase("test");
const coll = shardDB.exchange_in_session;
let bigString = "";
for (let i = 0; i < 20; i++) {
bigString += "s";
}
// Insert some documents.
const nDocs = 50;
for (let i = 0; i < nDocs; i++) {
assert.commandWorked(coll.insert({_id: i, bigString: bigString}));
}
session.startTransaction();
// Set up an Exchange with two cursors.
let res = assert.commandWorked(
shardDB.runCommand({
aggregate: coll.getName(),
pipeline: [],
exchange: {
policy: "keyRange",
consumers: NumberInt(2),
key: {_id: 1},
boundaries: [{a: MinKey}, {a: nDocs / 2}, {a: MaxKey}],
consumerIds: [NumberInt(0), NumberInt(1)],
bufferSize: NumberInt(128),
},
cursor: {batchSize: 0},
}),
);
function spawnShellToIterateCursor(cursorId) {
let code = `const cursor = ${tojson(cursorId)};`;
code += `const sessionId = ${tojson(session.getSessionId())};`;
code += `const collName = "${coll.getName()}";`;
/* eslint-disable */
function iterateCursorWithNoDocs() {
const getMoreCmd = {
getMore: cursor.id,
collection: collName,
batchSize: 4,
lsid: sessionId,
txnNumber: NumberLong(0),
autocommit: false,
};
let resp = null;
while (!resp || resp.cursor.id != 0) {
resp = assert.commandWorked(db.runCommand(getMoreCmd));
}
}
/* eslint-enable */
code += `(${iterateCursorWithNoDocs.toString()})();`;
return startParallelShell(code, st.rs0.getPrimary().port);
}
let parallelShells = [];
for (let curs of res.cursors) {
parallelShells.push(spawnShellToIterateCursor(curs.cursor));
}
assert.soon(function () {
for (let waitFn of parallelShells) {
waitFn();
}
return true;
});
assert.commandWorked(session.abortTransaction_forTesting());
st.stop();