mongo/jstests/noPassthrough/query/external_data_source.js

590 lines
22 KiB
JavaScript

/**
* Tests $_externalDataSources aggregate command option.
*
* @tags: [
* # This test file requires multi-threading for writers and tends to fail on small machines due to
* # thread resource shortage
* requires_external_data_source,
* ]
*/
import {aggPlanHasStage} from "jstests/libs/query/analyze_plan.js";
import {extractUUIDFromObject} from "jstests/libs/uuid_util.js";
// Runs tests on a standalone mongod.
let conn = MongoRunner.runMongod({setParameter: {enableComputeMode: true}});
let db = conn.getDB(jsTestName());
const kUrlProtocolFile = "file://";
const hostInfo = assert.commandWorked(db.hostInfo());
const kDefaultPipePath = (() => {
return hostInfo.os.type == "Windows" ? "//./pipe/" : "/tmp/";
})();
function getRandomPipeName() {
return "external_data_source_" + extractUUIDFromObject(UUID());
}
// Empty option
assert.throwsWithCode(() => {
db.coll.aggregate([{$match: {a: 1}}], {$_externalDataSources: []});
}, 7039002);
// No external file metadata
assert.throwsWithCode(() => {
db.coll.aggregate([{$match: {a: 1}}], {$_externalDataSources: [{collName: "coll", dataSources: []}]});
}, 7039001);
// No file type
assert.throwsWithCode(() => {
db.coll.aggregate([{$match: {a: 1}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [{url: kUrlProtocolFile + getRandomPipeName(), storageType: "pipe"}],
},
],
});
}, ErrorCodes.IDLFailedToParse);
// Unknown file type
assert.throwsWithCode(() => {
db.coll.aggregate([{$match: {a: 1}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [
{
url: kUrlProtocolFile + getRandomPipeName(),
storageType: "pipe",
fileType: "unknown",
},
],
},
],
});
}, ErrorCodes.BadValue);
// No storage type
assert.throwsWithCode(() => {
db.coll.aggregate([{$match: {a: 1}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [{url: kUrlProtocolFile + getRandomPipeName(), fileType: "bson"}],
},
],
});
}, ErrorCodes.IDLFailedToParse);
// Unknown storage type
assert.throwsWithCode(() => {
db.coll.aggregate([{$match: {a: 1}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [
{
url: kUrlProtocolFile + getRandomPipeName(),
storageType: "unknown",
fileType: "bson",
},
],
},
],
});
}, ErrorCodes.BadValue);
// No url
assert.throwsWithCode(() => {
db.coll.aggregate([{$match: {a: 1}}], {
$_externalDataSources: [{collName: "coll", dataSources: [{storageType: "pipe", fileType: "bson"}]}],
});
}, ErrorCodes.IDLFailedToParse);
// Invalid url #1: Unsupported protocol for 'pipe'
assert.throwsWithCode(() => {
db.coll.aggregate([{$match: {a: 1}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [{url: "http://abc.com/name1", storageType: "pipe", fileType: "bson"}],
},
],
});
}, 6968500);
// Invalid url #2: '..' in the url
assert.throwsWithCode(() => {
db.coll.aggregate([{$match: {a: 1}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [{url: kUrlProtocolFile + "../name1", storageType: "pipe", fileType: "bson"}],
},
],
});
}, [7001100, 7001101]);
// The source namespace is not an external data source
assert.throwsWithCode(() => {
db.unknown.aggregate([{$match: {a: 1}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [{url: kUrlProtocolFile + getRandomPipeName(), storageType: "pipe", fileType: "bson"}],
},
],
});
}, 7039003);
(function testSampleStageOverExternalDataSourceNotOptimized() {
const explain = db.coll.explain().aggregate([{$sample: {size: 10}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [{url: kUrlProtocolFile + getRandomPipeName(), storageType: "pipe", fileType: "bson"}],
},
],
});
assert(
aggPlanHasStage(explain, "$sample"),
`Expected $sample is not optimized into $sampleFromRandomCursor but got ${tojson(explain)}`,
);
})();
// Verifies that an external data source cannot be used for the $merge / $out pipeline stages.
(function testMergeOrOutStageToExternalDataSource() {
[{$out: "out"}, {$merge: "out"}].forEach((stage) => {
assert.throwsWithCode(() => {
db.coll.aggregate([stage], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [
{
url: kUrlProtocolFile + getRandomPipeName(),
storageType: "pipe",
fileType: "bson",
},
],
},
{
collName: "out",
dataSources: [
{
url: kUrlProtocolFile + getRandomPipeName(),
storageType: "pipe",
fileType: "bson",
},
],
},
],
});
}, 7239302);
});
})();
(function testCollectionlessAgg() {
const docs = [{a: 1}, {a: 2}, {a: 3}];
assert.sameMembers(docs, db.aggregate([{$documents: docs}]).toArray());
})();
(function testCollectionlessAggWithExternalDataSources() {
assert.throwsWithCode(() => {
db.aggregate([{$documents: [{a: 1}]}], {$_externalDataSources: []});
}, 7604400);
})();
//
// Named Pipes success test cases follow.
//
////////////////////////////////////////////////////////////////////////////////////////////////////
// Test production code for MultiBsonStreamCursor and below, plus the shell pipe writer and reader
// functions, separately from $_externalDataSources to help narrow down the source of any failures.
////////////////////////////////////////////////////////////////////////////////////////////////////
let objsPerPipe = 25;
(function testBasicPipeReadWrite() {
jsTestLog("Testing testBasicPipeReadWrite()");
const pipeName1 = getRandomPipeName();
const pipeName2 = pipeName1 + "1";
_writeTestPipe(pipeName1, objsPerPipe);
_writeTestPipe(pipeName2, objsPerPipe);
let result = _readTestPipes(pipeName1, pipeName2);
assert.eq(
2 * objsPerPipe,
bsonObjToArray(result)[0], // "objects" first field contains the count of objects read
"_readTestPipes read wrong number of objects: " + bsonObjToArray(result)[0],
);
})();
function testSimpleAggregationsOverExternalDataSource(pipeDir) {
////////////////////////////////////////////////////////////////////////////////////////////////
// Test that $_externalDataSource can read and aggregate multiple named pipes.
////////////////////////////////////////////////////////////////////////////////////////////////
(function testCountOverMultiplePipes() {
jsTestLog("Testing testCountOverMultiplePipes()");
objsPerPipe = 100;
const pipeName1 = getRandomPipeName();
const pipeName2 = pipeName1 + "1";
_writeTestPipe(pipeName1, objsPerPipe, 0, 2048, pipeDir);
_writeTestPipe(pipeName2, objsPerPipe, 0, 2048, pipeDir);
let result = db.coll.aggregate([{$count: "objects"}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [
{url: kUrlProtocolFile + pipeName1, storageType: "pipe", fileType: "bson"},
{url: kUrlProtocolFile + pipeName2, storageType: "pipe", fileType: "bson"},
],
},
],
});
assert.eq(
2 * objsPerPipe,
result._batch[0].objects, // shell puts agg result in "_batch"[0] field of a wrapper obj
"$_externalDataSources read wrong number of objects: " + result._batch[0].objects,
);
})();
////////////////////////////////////////////////////////////////////////////////////////////////
// Test correctness by verifying reading from the pipes returns the same objects written to
// them.
////////////////////////////////////////////////////////////////////////////////////////////////
// The following objects are also in BSON file external_data_source.bson in the same order.
const kPipes = 2; // number of pipes to write
const kObjsToWrite = [
{"Zero": "zero zero zero zero zero zero zero zero zero zero zero zero zero zero zero zero"},
{"One": "one one one one one one one one one one one one one one one one one one one one"},
{"Two": "two two two two two two two two two two two two two two two two two two two two"},
{"Three": "three three three three three three three three three three three three three"},
{"Four": "four four four four four four four four four four four four four four four four"},
{"Five": "five five five five five five five five five five five five five five five five"},
{"Six": "six six six six six six six six six six six six six six six six six six six six"},
];
const kNumObjs = kObjsToWrite.length; // number of different objects for round-robin
(function testRoundtripOverMultiplePipes() {
jsTestLog("Testing testRoundtripOverMultiplePipes()");
const pipeName1 = getRandomPipeName();
const pipeName2 = pipeName1 + "1";
_writeTestPipeObjects(pipeName1, objsPerPipe, kObjsToWrite, pipeDir);
_writeTestPipeObjects(pipeName2, objsPerPipe, kObjsToWrite, pipeDir);
let cursor = db.coll.aggregate([], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [
{url: kUrlProtocolFile + pipeName1, storageType: "pipe", fileType: "bson"},
{url: kUrlProtocolFile + pipeName2, storageType: "pipe", fileType: "bson"},
],
},
],
});
// Verify the objects read from the pipes match what was written to them.
for (let pipe = 0; pipe < kPipes; ++pipe) {
for (let objIdx = 0; objIdx < objsPerPipe; ++objIdx) {
assert.eq(
cursor.next(),
kObjsToWrite[objIdx % kNumObjs],
"Object read from pipe does not match expected.",
);
}
}
})();
////////////////////////////////////////////////////////////////////////////////////////////////
// Test _writeTestPipeBsonFile() correctness by verifying it writes objects from
// external_data_source.bson correctly as read back via $_externalDataSources. This is the same
// as prior test except for using _writeTestPipeBsonFile() instead of _writeTestPipeObjects().
////////////////////////////////////////////////////////////////////////////////////////////////
(function testRoundtripOverMultiplePipesUsingBsonFile() {
jsTestLog("Testing testRoundtripOverMultiplePipesUsingBsonFile()");
const pipeName1 = getRandomPipeName();
const pipeName2 = pipeName1 + "1";
_writeTestPipeBsonFile(
pipeName1,
objsPerPipe,
"jstests/noPassthrough/query/external_data_source.bson",
pipeDir,
);
_writeTestPipeBsonFile(
pipeName2,
objsPerPipe,
"jstests/noPassthrough/query/external_data_source.bson",
pipeDir,
);
let cursor = db.coll.aggregate([{$project: {_id: 0}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [
{url: kUrlProtocolFile + pipeName1, storageType: "pipe", fileType: "bson"},
{url: kUrlProtocolFile + pipeName2, storageType: "pipe", fileType: "bson"},
],
},
],
});
// Verify the objects read from the pipes match what was written to them.
for (let pipe = 0; pipe < kPipes; ++pipe) {
for (let objIdx = 0; objIdx < objsPerPipe; ++objIdx) {
assert.eq(
cursor.next(),
kObjsToWrite[objIdx % kNumObjs],
"Object read from pipe does not match expected.",
);
}
}
})();
// Prepares data for $match / $group / $unionWith / spill test cases.
jsTestLog("Preparing data for $match / $group / $unionWith/ spill");
Random.setRandomSeed();
const collObjs = [];
const kNumGroups = 10;
const kDocs = 1000;
for (let i = 0; i < kDocs; ++i) {
collObjs.push({
_id: i,
g: Random.randInt(kNumGroups), // 10 groups
str1: "strdata_" + Random.randInt(100000000),
});
}
(function testMatchOverExternalDataSource() {
jsTestLog("Testing testMatchOverExternalDataSource()");
const pipeName1 = getRandomPipeName();
_writeTestPipeObjects(pipeName1, collObjs.length, collObjs, pipeDir);
const kNumFilter = 5;
const expectedRes = collObjs.filter((obj) => obj.g < kNumFilter);
const cursor = db.coll.aggregate([{$match: {g: {$lt: kNumFilter}}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [{url: kUrlProtocolFile + pipeName1, storageType: "pipe", fileType: "bson"}],
},
],
});
const resArr = cursor.toArray();
assert.eq(resArr.length, expectedRes.length);
for (let i = 0; i < expectedRes.length; ++i) {
assert.eq(resArr[i], expectedRes[i], `Expected ${tojson(expectedRes[i])} but got ${tojson(resArr[i])}`);
}
})();
// Computes {$group: {_id: "$g", c: {$count: {}}} manually.
function getCountPerGroupResult(collObjs) {
const countPerGroup = [];
for (let i = 0; i < kNumGroups; ++i) {
countPerGroup[i] = 0;
}
collObjs.forEach((obj) => {
++countPerGroup[obj.g];
});
const expectedRes = [];
countPerGroup.forEach((cnt, idx) => {
if (cnt > 0) {
expectedRes.push({_id: idx, c: cnt});
}
});
return expectedRes;
}
(function testGroupOverExternalDataSource() {
jsTestLog("Testing testGroupOverExternalDataSource()");
const pipeName1 = getRandomPipeName();
_writeTestPipeObjects(pipeName1, collObjs.length, collObjs, pipeDir);
const expectedRes = getCountPerGroupResult(collObjs);
const cursor = db.coll.aggregate([{$group: {_id: "$g", c: {$count: {}}}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [{url: kUrlProtocolFile + pipeName1, storageType: "pipe", fileType: "bson"}],
},
],
});
const resArr = cursor.toArray();
assert.sameMembers(resArr, expectedRes);
})();
(function testUnionWithOverExternalDataSource() {
jsTestLog("Testing testUnionWithOverExternalDataSource()");
const pipeName1 = getRandomPipeName();
const pipeName2 = getRandomPipeName() + "1";
_writeTestPipeObjects(pipeName1, collObjs.length, collObjs, pipeDir);
_writeTestPipeObjects(pipeName2, collObjs.length, collObjs, pipeDir);
const expectedRes = collObjs.concat(collObjs);
const collName1 = "coll1";
const collName2 = "coll2";
const cursor = db[collName1].aggregate([{$unionWith: collName2}], {
$_externalDataSources: [
{
collName: collName1,
dataSources: [{url: kUrlProtocolFile + pipeName1, storageType: "pipe", fileType: "bson"}],
},
{
collName: collName2,
dataSources: [{url: kUrlProtocolFile + pipeName2, storageType: "pipe", fileType: "bson"}],
},
],
});
const resArr = cursor.toArray();
assert.eq(resArr.length, expectedRes.length);
for (let i = 0; i < expectedRes.length; ++i) {
assert.eq(resArr[i], expectedRes[i], `Expected ${tojson(expectedRes[i])} but got ${tojson(resArr[i])}`);
}
})();
(function testSpillingGroupOverExternalDataSource() {
jsTestLog("Testing testSpillingGroupOverExternalDataSource()");
// Makes sure that both classic/SBE $group spill data.
const oldClassicGroupMaxMemory = assert.commandWorked(
db.adminCommand({
setParameter: 1,
internalDocumentSourceGroupMaxMemoryBytes: 1,
}),
).was;
const oldSbeGroupMaxMemory = assert.commandWorked(
db.adminCommand({
setParameter: 1,
internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: 1,
}),
).was;
const pipeName1 = getRandomPipeName();
_writeTestPipeObjects(pipeName1, collObjs.length, collObjs, pipeDir);
const expectedRes = getCountPerGroupResult(collObjs);
const cursor = db.coll.aggregate([{$group: {_id: "$g", c: {$count: {}}}}], {
$_externalDataSources: [
{
collName: "coll",
dataSources: [{url: kUrlProtocolFile + pipeName1, storageType: "pipe", fileType: "bson"}],
},
],
});
const resArr = cursor.toArray();
assert.sameMembers(resArr, expectedRes);
assert.commandWorked(
db.adminCommand({
setParameter: 1,
internalDocumentSourceGroupMaxMemoryBytes: oldClassicGroupMaxMemory,
}),
);
assert.commandWorked(
db.adminCommand({
setParameter: 1,
internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: oldSbeGroupMaxMemory,
}),
);
})();
// Verifies that 'killCursors' command works over external data sources while the server keeps
// reading data from a named pipe. Reading external data sources should be interruptible.
(function testKillCursorOverExternalDataSource() {
jsTestLog("Testing testKillCursorOverExternalDataSource()");
// Prepares a large dataset.
const largeCollObjs = [];
const kManyDocs = 250000;
for (let i = 0; i < kManyDocs; ++i) {
largeCollObjs.push({
_id: i,
g: Random.randInt(10), // 10 groups
str1: "strdata_" + Random.randInt(100000000),
});
}
// We read 2 collections using $unionWith so that the result set cannot be fit into one
// result batch for the 'getMore' request.
const collName1 = "coll1";
const collName2 = "coll2";
// 250K docs almost reaches 16MB BSONObj size limit.
const pipeName1 = getRandomPipeName();
const pipeName2 = getRandomPipeName() + "1";
_writeTestPipeObjects(pipeName1, largeCollObjs.length, largeCollObjs, pipeDir);
_writeTestPipeObjects(pipeName2, largeCollObjs.length, largeCollObjs, pipeDir);
let cursor = db[collName1].aggregate([{$unionWith: collName2}], {
$_externalDataSources: [
{
collName: collName1,
dataSources: [{url: kUrlProtocolFile + pipeName1, storageType: "pipe", fileType: "bson"}],
},
{
collName: collName2,
dataSources: [{url: kUrlProtocolFile + pipeName2, storageType: "pipe", fileType: "bson"}],
},
],
});
// Has 'getMore' command issued to the server since the default batch size is 101 documents.
for (let i = 0; i < 102; ++i) {
cursor.next();
}
// Has 'killCursors' command issued to the server while the server is reading more data from
// named pipes to send the next batch. If this fails, an exception will be thrown.
cursor.close();
})();
}
jsTestLog("Testing successful named pipe test cases");
testSimpleAggregationsOverExternalDataSource(kDefaultPipePath);
MongoRunner.stopMongod(conn);
// The 'externalPipeDir' is effective only on POSIX-like system.
if (hostInfo.os.type != "Windows") {
// Verfies that 'externalPipeDir' server parameter works with the same test cases.
(function testExternalPipeDirWorks() {
jsTestLog("Testing testExternalPipeDirWorks()");
const pipeDir = MongoRunner.dataDir + "/tmp/";
assert(mkdir(pipeDir).created, `Failed to create ${pipeDir}`);
jsTestLog(`Testing named pipe test cases with externalPipeDir=${pipeDir}`);
conn = MongoRunner.runMongod({setParameter: {enableComputeMode: true, externalPipeDir: pipeDir}});
db = conn.getDB(jsTestName());
testSimpleAggregationsOverExternalDataSource(pipeDir);
MongoRunner.stopMongod(conn);
})();
// Verifies that 'externalPipeDir' with '..' is rejected.
(function testInvalidExternalPipeDirRejected() {
jsTestLog("Testing testInvalidExternalPipeDirRejected()");
const pipeDir = MongoRunner.dataDir + "/tmp/abc/../def/";
assert(mkdir(pipeDir).created, `Failed to create ${pipeDir}`);
jsTestLog(`Testing externalPipeDir=${pipeDir}`);
const pid = MongoRunner.runMongod({
waitForConnect: false,
setParameter: {enableComputeMode: true, externalPipeDir: pipeDir},
}).pid;
assert.soon(
() => {
const runningStatus = checkProgram(pid);
return !runningStatus.alive && runningStatus.exitCode != 0;
},
"Expected mongod died due to an error",
120 * 1000,
);
})();
}