mirror of https://github.com/mongodb/mongo
149 lines
5.0 KiB
JavaScript
149 lines
5.0 KiB
JavaScript
/**
|
|
* This test tests concurrent read and write behavior for tailable cursors on unreplicated capped
|
|
* collections. These collections accept concurrent writes and thus must ensure no documents are
|
|
* skipped for forward cursors.
|
|
*
|
|
* This test sets up a single capped collection with many concurrent writers. Concurrent readers
|
|
* open tailable cursors and clone the contents into their own collection copies. The readers then
|
|
* assert that the contents match the source.
|
|
*
|
|
* @tags: [
|
|
* assumes_against_mongod_not_mongos,
|
|
* does_not_support_retryable_writes,
|
|
* requires_capped,
|
|
* requires_non_retryable_writes,
|
|
* # Tailable cursors do not work correctly on previous versions.
|
|
* requires_fcv_63,
|
|
* ]
|
|
*/
|
|
|
|
load("jstests/libs/parallelTester.js"); // For Thread
|
|
|
|
(function() {
|
|
'use strict';
|
|
|
|
function insertWorker(host, collName, tid, nInserts) {
|
|
const conn = new Mongo(host);
|
|
const db = conn.getDB('local');
|
|
|
|
for (let i = 0; i < nInserts;) {
|
|
const bulk = db[collName].initializeUnorderedBulkOp();
|
|
for (let j = 0; j < 10; j++) {
|
|
bulk.insert({t: tid, i: i++});
|
|
}
|
|
assert.commandWorked(bulk.execute());
|
|
}
|
|
print(tid + ": done");
|
|
}
|
|
|
|
function tailWorker(host, collName, tid, expectedDocs) {
|
|
// Rewrite the connection string as a mongo URI so that we can add an 'appName' to make
|
|
// debugging easier. When run against a standalone, 'host' is in the form '<host>:<port>'. When
|
|
// run against a replica set, 'host' is in the form '<rs name>/<host1>:<port1>,...'
|
|
const iSlash = host.indexOf('/');
|
|
let connString = 'mongodb://';
|
|
if (iSlash > 0) {
|
|
connString += host.substr(iSlash + 1) + '/?appName=tid' + tid +
|
|
'&replicaSet=' + host.substr(0, iSlash);
|
|
} else {
|
|
connString += host + '/?appName=tid' + tid;
|
|
}
|
|
const conn = new Mongo(connString);
|
|
const db = conn.getDB('local');
|
|
const cloneColl = db[collName + "_clone_" + tid];
|
|
cloneColl.drop();
|
|
|
|
let res = db.runCommand({find: collName, batchSize: 0, awaitData: true, tailable: true});
|
|
assert.commandWorked(res);
|
|
assert.gt(res.cursor.id, NumberLong(0));
|
|
assert.eq(res.cursor.firstBatch.length, 0);
|
|
|
|
const curId = res.cursor.id;
|
|
|
|
let myCount = 0;
|
|
let emptyGetMores = 0;
|
|
let nonEmptyGetMores = 0;
|
|
assert.soon(() => {
|
|
res = db.runCommand({getMore: curId, collection: collName, maxTimeMS: 1000});
|
|
assert.commandWorked(res);
|
|
|
|
const batchLen = res.cursor.nextBatch.length;
|
|
if (batchLen > 0) {
|
|
nonEmptyGetMores++;
|
|
} else {
|
|
emptyGetMores++;
|
|
}
|
|
|
|
print(tid + ': got batch of size ' + batchLen +
|
|
'. first doc: ' + tojson(res.cursor.nextBatch[0]) +
|
|
'. last doc: ' + tojson(res.cursor.nextBatch[batchLen - 1]) +
|
|
'. empty getMores so far: ' + emptyGetMores +
|
|
'. non-empty getMores so far: ' + nonEmptyGetMores);
|
|
myCount += batchLen;
|
|
|
|
const bulk = cloneColl.initializeUnorderedBulkOp();
|
|
for (let i = 0; i < batchLen; i++) {
|
|
bulk.insert(res.cursor.nextBatch[i]);
|
|
}
|
|
assert.commandWorked(bulk.execute());
|
|
|
|
// The writers are done, so we are draining until we see as many docs as we
|
|
// expect.
|
|
if (myCount == expectedDocs) {
|
|
return true;
|
|
} else {
|
|
print(tid + ": waiting. my count: " + myCount + " expected: " + expectedDocs);
|
|
}
|
|
return false;
|
|
}, "failed to return all documents within timeout");
|
|
|
|
print(tid + ": validating");
|
|
const expected = db[collName].find().sort({_id: 1}).toArray();
|
|
const actual = cloneColl.find().sort({_id: 1}).toArray();
|
|
assert.eq(expected.length, actual.length, function() {
|
|
return "number of documents do not match. expected: " + tojson(expected) +
|
|
" actual: " + tojson(actual);
|
|
});
|
|
for (let i = 0; i < actual.length; i++) {
|
|
assert.docEq(actual[i], expected[i], function() {
|
|
return "mismatched documents. expected: " + tojson(expected) +
|
|
" actual: " + tojson(actual);
|
|
});
|
|
}
|
|
print(tid + ": done");
|
|
}
|
|
|
|
const collName = 'capped';
|
|
const localDb = db.getSiblingDB('local');
|
|
localDb[collName].drop();
|
|
|
|
assert.commandWorked(localDb.runCommand({create: collName, capped: true, size: 10 * 1024 * 1024}));
|
|
assert.commandWorked(localDb[collName].insert({firstDoc: 1, i: -1}));
|
|
|
|
const nWriters = 5;
|
|
const nReaders = 5;
|
|
|
|
const insertsPerThread = 1000;
|
|
const expectedDocs = nWriters * insertsPerThread + 1;
|
|
|
|
let threads = [];
|
|
|
|
for (let i = 0; i < nReaders; i++) {
|
|
const thread =
|
|
new Thread(tailWorker, db.getMongo().host, collName, threads.length, expectedDocs);
|
|
thread.start();
|
|
threads.push(thread);
|
|
}
|
|
|
|
for (let i = 0; i < nWriters; i++) {
|
|
const thread =
|
|
new Thread(insertWorker, db.getMongo().host, collName, threads.length, insertsPerThread);
|
|
thread.start();
|
|
threads.push(thread);
|
|
}
|
|
|
|
for (let i = 0; i < threads.length; i++) {
|
|
threads[i].join();
|
|
}
|
|
})();
|