SERVER-108997 Use the data generator in the plan stability tests (#41233)

GitOrigin-RevId: 1593f0ec798a6943169300d5d31e0956486c6bed
This commit is contained in:
Philip Stoev 2025-09-17 09:56:06 +02:00 committed by MongoDB Bot
parent d3750ba292
commit c74015b769
18 changed files with 67096 additions and 150 deletions

View File

@ -19,7 +19,9 @@ export class DataGenerator {
throw new Error("A db object must be provided to the DataGenerator constructor.");
} else {
this.dbName = db.getName();
this.uri = "mongodb://" + db.getMongo().host;
// The data_generator opens a connection to the database first and then
// begins to generate data. So it will time out if the dataset is large.
this.uri = "mongodb://" + db.getMongo().host + "/?socketTimeoutMS=1000000";
}
const tmpDir = _getEnv("TMPDIR") || _getEnv("TMP_DIR") || "/tmp";
@ -35,7 +37,7 @@ export class DataGenerator {
this.seed = seed;
}
execute({spec = null, size = null, indices = null, analyze = false, serial_inserts = true} = {}) {
execute({spec = null, size = null, indexes = null, drop = true, analyze = false, serial_inserts = true} = {}) {
let args = [
getPython3Binary(),
DataGenerator.PROGRAM_PATH,
@ -53,8 +55,8 @@ export class DataGenerator {
args.push("--size", size);
if (indices !== null) {
args.push("--indices", indices);
if (indexes !== null) {
args.push("--indexes", indexes);
}
if (this.seed !== null) {
@ -65,7 +67,11 @@ export class DataGenerator {
args.push("--serial-inserts");
}
if (this.analyze) {
if (drop) {
args.push("--drop");
}
if (analyze) {
args.push("--analyze");
}

View File

@ -12,7 +12,7 @@ try {
const collName = "Test";
const size = 10;
dg.execute({spec: collName, size: size, indices: "test_index", analyze: true});
dg.execute({spec: collName, size: size, indexes: "test_index", analyze: true});
assert.eq(db[collName].find({i: {$exists: true}}).count(), size);
assert.eq(db[collName].getIndexes().length, 2); // _id and i_idx test index

View File

@ -22,7 +22,7 @@ To obtain a diff that contains an individual diff fragment for each changed plan
1. Put the following line in `$HOME/.config/git/attributes`:
```
**/plan_stability diff=plan_stability
**/plan_stability* diff=plan_stability
```
2. Edit the `~/.golden_test_config.yml` to use a customized diff command:
@ -63,12 +63,12 @@ in that repository for more information.
## Running the offending pipelines manually
1. Populate the data and the indexes:
1. Populate just the data and the indexes without executing the pipelines:
```bash
buildscripts/resmoke.py run \
--suites=query_golden_classic \
--mongodSetParameters='{internalQueryFrameworkControl: forceClassicEngine, ...}' \
--mongodSetParameters='{internalQueryFrameworkControl: forceClassicEngine, planRankerMode: samplingCE, internalQuerySamplingBySequentialScan: True}' \
jstests/query_golden/plan_stability.js \
--pauseAfterPopulate
```
@ -113,6 +113,18 @@ db.plan_stability.aggregate(pipeline).explain('executionStats').executionStats.e
You can also modify `collSize` in `plan_stability.js` to temporarily use a larger scale factor.
# Running comparisons across CE estimation methods
If you want to run a comparison between estimation methods `X` and `Y`:
1. If method `X` is not multi-planning, place the `jstests/query_golden/expected_files/X` for estimation method `X` in the root of `expected_files`, so that they are used as the base for the comparison;
2. Temporary remove the expected files for method `Y` from `expected_files/query_golden/expected_files/Y` so that they are not considered;
3. Run the test as described above, specifying `planRankerMode: X`;
4. Use the summarization script as described above to produce a report.
# Modifying the test
## Accepting the modified query plans
@ -122,5 +134,5 @@ To accept the new plans, use `buildscripts/query_golden.py accept`, as with any
## Removing individual pipelines
If a given pipeline proves flaky, that is, is flipping between one plan and another for no reason,
you can comment it out from the test with a note. Rerurn the test and then run `buildscripts/golden_test.py accept`
you can comment it out from the test with a note. Re-run the test and then run `buildscripts/golden_test.py accept`
to persist the change.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -80,3 +80,125 @@ export function extractSortEffort(stage) {
return Math.round(effort);
}
export function runPlanStabilityPipelines(db, collName, pipelines) {
let totalPlans = 0;
let totalKeys = 0;
let totalDocs = 0;
let totalSorts = 0;
let totalRows = 0;
let totalErrors = 0;
/**
* The output of this test is a JSON that contains both the plans and stats for each pipeline
* as well as a summary section with totals. The structure of the output is as follows:
* {
* "pipelines: [
* {
* ">>>pipeline" : <pipeline>,
* "winningPlan": <winningPlan>,
* "keys" : <totalKeysExamined>,
* "docs" : <totalDocsExamined>,
* "sorts" : <sortEffort>,
* "plans" : <numberOfPlans>,
* "rows" : <nReturned>
* },
* ...
* ],
* ">>>totals": {
* "keys": <totalKeysExamined>, "docs": <totalDocsExamined>, "sortEffort":
* <totalSortEffort>, "plans": <numberOfPlans>, "rows": <nReturned>
* }
* }
*
* The sortEffort is an abstract measure of the complexity of any SORT stages, and is
* defined as (LOG(nReturned) + 1) * inputStage.nReturned.
*/
print('{">>>pipelines":[');
pipelines.forEach((pipeline, index) => {
// JSON does not allow trailing commas.
const separator = index === pipelines.length - 1 ? "" : ",";
// We print the pipeline here so that, even if the test fails,
// we have already emitted the failing pipeline.
print(`{">>>pipeline": ${JSON.stringify(pipeline)},`);
// We do not use explain() as it loses the errmsg in case of an error.
const explain = db.runCommand({
explain: {aggregate: collName, pipeline: pipeline, cursor: {}},
verbosity: "executionStats",
});
const executionStats = explain.executionStats;
if (explain.ok !== 1) {
let error = "unknown error";
if (explain.hasOwnProperty("errmsg")) {
error = explain.errmsg;
} else if (
explain.hasOwnProperty("executionStats") &&
explain.executionStats.hasOwnProperty("errorMessage")
) {
error = explain.executionStats.errorMessage;
}
print(` "error": ${JSON.stringify(error)}}${separator}`);
totalErrors++;
return;
}
const winningPlan = trimPlanToStagesAndIndexes(explain.queryPlanner.winningPlan);
const plans = explain.queryPlanner.rejectedPlans.length + 1;
totalPlans += plans;
const keys = executionStats.totalKeysExamined;
totalKeys += keys;
const docs = executionStats.totalDocsExamined;
totalDocs += docs;
const nReturned = executionStats.nReturned;
totalRows += nReturned;
const sorts = extractSortEffort(executionStats.executionStages);
totalSorts += sorts;
print(` "winningPlan": ${JSON.stringify(winningPlan)},`);
print(` "keys" : ${padNumber(keys)},`);
print(` "docs" : ${padNumber(docs)},`);
print(` "sorts": ${padNumber(sorts)},`);
print(` "plans": ${padNumber(plans)},`);
print(` "rows" : ${padNumber(nReturned)}}${separator}`);
print();
});
print("],");
print(
'">>>totals": {' +
`"pipelines": ${pipelines.length}, ` +
`"plans": ${totalPlans}, ` +
`"keys": ${padNumber(totalKeys)}, ` +
`"docs": ${padNumber(totalDocs)}, ` +
`"sorts": ${padNumber(totalSorts)}, ` +
`"rows": ${padNumber(totalRows)}, ` +
`"errors": ${padNumber(totalErrors)}},`,
);
const parameters = {
planRankerMode: null,
samplingMarginOfError: null,
samplingConfidenceInterval: null,
internalQuerySamplingCEMethod: null,
internalQuerySamplingBySequentialScan: null,
};
for (const param in parameters) {
const result = db.adminCommand({getParameter: 1, [param]: 1});
parameters[param] = result[param];
}
print(`">>>parameters": ${JSON.stringify(parameters)}}`);
jsTest.log.info("See README.plan_stability.md for more information.");
}

View File

@ -2,35 +2,36 @@
* Tests that the plans remain stable across releases by comparing the expected plans against the
* current ones. A product of SPM-3816. See README.plan_stability.md for more information.
*
* This test uses a simplistic dataset with a few columns, few indexes and trivial data distributions.
*
* The queries used in this test are generated using jstestfuzz, using the following grammar:
* `jstestfuzz:src/fuzzers/plan_stability/plan_stability.ne`
* before being processed by the scripts in the `feature-extractor` repository at:
* `feature-extractor:scripts/cbr/`
*
* @tags: [
* incompatible_aubsan,
* tsan_incompatible,
* ]
*
*/
import {checkSbeFullyEnabled} from "jstests/libs/query/sbe_util.js";
import {extractSortEffort, padNumber, trimPlanToStagesAndIndexes} from "jstests/query_golden/libs/utils.js";
import {pipelines} from "jstests/query_golden/test_inputs/plan_stability_pipelines.js";
import {populateSimplePlanStabilityDataset} from "jstests/query_golden/test_inputs/simple_plan_stability_dataset.js";
import {isSlowBuild} from "jstests/libs/query/aggregation_pipeline_utils.js";
import {runPlanStabilityPipelines} from "jstests/query_golden/libs/utils.js";
if (checkSbeFullyEnabled(db)) {
jsTest.log.info("Skipping the test because CBR only applies to the classic engine.");
quit();
}
if (
db.getServerBuildInfo().isAddressSanitizerActive() ||
db.getServerBuildInfo().isLeakSanitizerActive() ||
db.getServerBuildInfo().isThreadSanitizerActive() ||
db.getServerBuildInfo().isUndefinedBehaviorSanitizerActive()
) {
if (isSlowBuild(db)) {
jsTest.log.info("Skipping the test because a sanitizer is active.");
quit();
}
const collName = "plan_stability";
/**
* We use a dataset with 100K rows so that:
* 1. Queries still complete in a reasonable time.
@ -38,117 +39,9 @@ const collName = "plan_stability";
* rather than simple "off-by-one" counter increments/decrements.
*/
const collSize = 100_000;
const collName = jsTestName();
jsTest.log.info("See README.plan_stability.md for more information.");
populateSimplePlanStabilityDataset(collName, collSize);
let totalPlans = 0;
let totalKeys = 0;
let totalDocs = 0;
let totalSorts = 0;
let totalRows = 0;
let totalErrors = 0;
/**
* The output of this test is a JSON that contains both the plans and stats for each pipeline
* as well as a summary section with totals. The structure of the output is as follows:
* {
* "pipelines: [
* {
* ">>>pipeline" : <pipeline>,
* "winningPlan": <winningPlan>,
* "keys" : <totalKeysExamined>,
* "docs" : <totalDocsExamined>,
* "sorts" : <sortEffort>,
* "plans" : <numberOfPlans>,
* "rows" : <nReturned>
* },
* ...
* ],
* ">>>totals": {
* "keys": <totalKeysExamined>, "docs": <totalDocsExamined>, "sortEffort":
* <totalSortEffort>, "plans": <numberOfPlans>, "rows": <nReturned>
* }
* }
*
* The sortEffort is an abstract measure of the complexity of any SORT stages, and is
* defined as (LOG(nReturned) + 1) * inputStage.nReturned.
*/
print('{">>>pipelines":[');
pipelines.forEach((pipeline, index) => {
// JSON does not allow trailing commas.
const separator = index === pipelines.length - 1 ? "" : ",";
// We print the pipeline here so that, even if the test fails,
// we have already emitted the failing pipeline.
print(`{">>>pipeline": ${JSON.stringify(pipeline)},`);
// We do not use explain() as it loses the errmsg in case of an error.
const explain = db.runCommand({
explain: {aggregate: collName, pipeline: pipeline, cursor: {}},
verbosity: "executionStats",
});
if (explain.ok !== 1) {
print(` "error": ${JSON.stringify(explain.errmsg)}}${separator}`);
totalErrors++;
return;
}
const executionStats = explain.executionStats;
const winningPlan = trimPlanToStagesAndIndexes(explain.queryPlanner.winningPlan);
const plans = explain.queryPlanner.rejectedPlans.length + 1;
totalPlans += plans;
const keys = executionStats.totalKeysExamined;
totalKeys += keys;
const docs = executionStats.totalDocsExamined;
totalDocs += docs;
const nReturned = executionStats.nReturned;
totalRows += nReturned;
const sorts = extractSortEffort(executionStats.executionStages);
totalSorts += sorts;
print(` "winningPlan": ${JSON.stringify(winningPlan)},`);
print(` "keys" : ${padNumber(keys)},`);
print(` "docs" : ${padNumber(docs)},`);
print(` "sorts": ${padNumber(sorts)},`);
print(` "plans": ${padNumber(plans)},`);
print(` "rows" : ${padNumber(nReturned)}}${separator}`);
print();
});
print("],");
print(
'">>>totals": {' +
`"pipelines": ${pipelines.length}, ` +
`"plans": ${totalPlans}, ` +
`"keys": ${padNumber(totalKeys)}, ` +
`"docs": ${padNumber(totalDocs)}, ` +
`"sorts": ${padNumber(totalSorts)}, ` +
`"rows": ${padNumber(totalRows)}, ` +
`"errors": ${padNumber(totalErrors)}},`,
);
const parameters = {
planRankerMode: null,
samplingMarginOfError: null,
samplingConfidenceInterval: null,
internalQuerySamplingCEMethod: null,
internalQuerySamplingBySequentialScan: null,
};
for (const param in parameters) {
const result = db.adminCommand({getParameter: 1, [param]: 1});
parameters[param] = result[param];
}
print(`">>>parameters": ${JSON.stringify(parameters)}}`);
jsTestLog("See README.plan_stability.md for more information.");
runPlanStabilityPipelines(db, collName, pipelines);

View File

@ -0,0 +1,66 @@
/**
* Tests that the plans remain stable across releases by comparing the expected plans against the
* current ones. A product of SPM-3816. See README.plan_stability.md for more information.
*
* The dataset used in this test is generated by the data_generator in
* `mongo:src/mongo/db/query/benchmark/data_generator`
* using the specification in
* `mongo:src/mongo/db/query/benchmark/specs/plan_stability_pipelines2.js`.
*
* The queries used in this test are generated using jstestfuzz, using the following grammar:
* `jstestfuzz:src/fuzzers/plan_stability/plan_stability2.ne`
* before being processed by the scripts in the `feature-extractor` repository at:
* `feature-extractor:scripts/cbr/`
*
* The main differences to the plan_stability.js are:
* - coverage of more data types
* - more columns and indexes
* - more data distributions
* - additional MQL operators
*
* @tags: [
* incompatible_aubsan,
* tsan_incompatible,
* ]
*
*/
import {isSlowBuild} from "jstests/libs/query/aggregation_pipeline_utils.js";
import {checkSbeFullyEnabled} from "jstests/libs/query/sbe_util.js";
import {runPlanStabilityPipelines} from "jstests/query_golden/libs/utils.js";
import {pipelines} from "jstests/query_golden/test_inputs/plan_stability_pipelines2.js";
import {DataGenerator} from "jstests/libs/query/data_generator.js";
import {checkPauseAfterPopulate} from "jstests/libs/pause_after_populate.js";
if (checkSbeFullyEnabled(db)) {
jsTest.log.info("Skipping the test because CBR only applies to the classic engine.");
quit();
}
if (isSlowBuild(db)) {
jsTest.log.info("Skipping the test because a sanitizer is active.");
quit();
}
/**
* We use a dataset with 10K rows so that:
* 1. The generation step completes in a reasonable amount of time,
* as we have dozens of columns and indexes to generate.
* 2. There will be sufficient difference between plans,
* rather than simple "off-by-one" counter increments/decrements.
*/
const collSize = 100_000;
const collName = jsTestName();
jsTest.log.info("See README.plan_stability.md for more information.");
const dg = new DataGenerator({db: db, module: "specs." + collName, seed: 1});
try {
dg.execute({spec: collName, size: collSize, indexes: "indexes", analyze: true, drop: true, serial_inserts: true});
checkPauseAfterPopulate();
runPlanStabilityPipelines(db, collName, pipelines);
} finally {
dg.cleanup();
}

File diff suppressed because it is too large Load Diff

View File

@ -76,10 +76,10 @@ export function populateSimplePlanStabilityDataset(collName, collSize) {
coll.drop();
jsTestLog("Generating " + collSize + " values with Zipfian distribution");
jsTest.log.info("Generating " + collSize + " values with Zipfian distribution");
const zipfianValues = generateZipfianList(collSize, 1.5);
jsTestLog("Generated " + collSize + " documents");
jsTest.log.info("Generated " + collSize + " documents");
const documents = [];
for (let i = 0; i < collSize; i++) {
documents.push({
@ -107,10 +107,10 @@ export function populateSimplePlanStabilityDataset(collName, collSize) {
});
}
jsTestLog("Inserting " + collSize + " documents into collection " + collName);
jsTest.log.info("Inserting " + collSize + " documents into collection " + collName);
coll.insertMany(documents);
jsTestLog("Creating indexes on collection " + collName);
jsTest.log.info("Creating indexes on collection " + collName);
const fields = ["i", "z", "c", "d", "h", "k", "a"];
for (const field of fields) {
assert.commandWorked(coll.createIndex({[field + "_idx"]: 1}));
@ -132,7 +132,7 @@ export function populateSimplePlanStabilityDataset(collName, collSize) {
assert.commandWorked(coll.createIndex(compoundIndex));
});
jsTestLog("Done creating indexes.");
jsTest.log.info("Done creating indexes.");
checkPauseAfterPopulate();
}

View File

@ -53,14 +53,14 @@ python3 driver.py specs.employee Employee --size 10
will generate 10 `Employee` objects.
The `--indices` switch can be used to create indices from index sets defined in the specification.
The `--indexes` switch can be used to create indexes from index sets defined in the specification.
Thus,
```
python3 driver.py specs.employee Employee --size 10 --indices index_set_1 --indices index_set_2
python3 driver.py specs.employee Employee --size 10 --indexes index_set_1 --indexes index_set_2
```
will generate 10 `Employee` objects _and_ create the indices listed in `index_set_1` and
will generate 10 `Employee` objects _and_ create the indexes listed in `index_set_1` and
`index_set_2`.
# Dropping, dumping, and restoring
@ -104,7 +104,7 @@ If any flags are set, the order in which they operate is:
1. The collection is dropped (`--drop`);
2. The collection is restored (`--restore`);
3. New data are generated into the collection (controlled by `-n`);
4. Indices are created (controlled by `--indices`);
4. Indexes are created (controlled by `--indexes`);
5. Specifications and commands are snapshotted (if any of the above steps changed any data); and
6. The collection is dumped (`--dump`).
@ -128,7 +128,7 @@ file.
Commands that are considered to change the dataset are ones that:
- Contains either `--drop` or `--restore`, or
- Adds an index set with `--indices`, or
- Adds an index set with `--indexes`, or
- Generates at least one document with positive `--size`.
In theory, this means that someone else can analyze the dumped `commands.sh` file and the copied

View File

@ -130,7 +130,7 @@ class FieldStatisticByScalarType:
self.unique.add(field_value)
SAMPLE_LIMIT = 1000
SAMPLE_LIMIT = 100
SUPPORTED_SCALAR_TYPES = {
float.__name__: "dbl",

View File

@ -188,7 +188,7 @@ async def main():
Run the 'analyze' command against each field of the collection.
Analyze is not preserved across restarts, or when dumping or restoring.
""")
parser.add_argument("--indices", action="append", help="An index set to load.")
parser.add_argument("--indexes", action="append", help="An index set to load.")
parser.add_argument("--restore-args", type=str, help="Parameters to pass to mongorestore.")
parser.add_argument(
"--out",
@ -252,20 +252,20 @@ async def main():
await upstream(database_instance, collection_name, generator, args.size, context_manager)
generator_factory.dump_metadata(collection_name, args.size, seed, metadata_path)
# 3. Create indices after documents.
indices = args.indices if args.indices else ()
for index_set_name in indices:
# 3. Create indexes after documents.
indexes = args.indexes if args.indexes else ()
for index_set_name in indexes:
if hasattr(module, index_set_name):
index_set = getattr(module, index_set_name)
indices = index_set() if callable(index_set) else index_set
indexes = index_set() if callable(index_set) else index_set
await database_instance.database.get_collection(collection_name).create_indexes(
indices
indexes
)
else:
raise RuntimeError(f"Module {module} does not define index set {index_set_name}.")
# 4. Only record things if the dataset is somehow actually changed.
if any((args.size, args.indices, args.drop, args.restore)):
if any((args.size, args.indexes, args.drop, args.restore)):
# Only record the seed additionally if it wasn't already passed in.
record_metadata(
module if args.size else None,

View File

@ -29,3 +29,8 @@ py_library(
name = "partially_correlated",
srcs = ["partially_correlated.py"],
)
py_library(
name = "plan_stability2",
srcs = ["plan_stability2.py"],
)

View File

@ -149,7 +149,7 @@ class Employee:
return today - ago
# Using a function to return some indices.
# Using a function to return some indexes.
def index_set_1() -> list[pymongo.IndexModel]:
return [
pymongo.IndexModel(keys="title.level", name="title_level"),
@ -160,7 +160,7 @@ def index_set_1() -> list[pymongo.IndexModel]:
]
# Using a global to define some indices.
# Using a global to define some indexes.
index_set_2 = [
pymongo.IndexModel(
keys=[("title.level", pymongo.ASCENDING), ("start_date", pymongo.DESCENDING)],

View File

@ -0,0 +1,218 @@
import dataclasses
import inspect
from collections import OrderedDict
from datetime import datetime, timedelta, timezone
from string import ascii_lowercase
from typing import Callable
import pymongo
from bson.decimal128 import Decimal128
from bson.timestamp import Timestamp
from datagen.util import MISSING, Specification
from faker import Faker
# 75% of the fields will be indexed
NUM_FIELDS = 48
# 50% chance of no correlation
CORRELATIONS = ['a', 'b', 'c', None, None, None]
class mixed:
"""Used to designate mixed-type fields"""
AVAILABLE_TYPES = [
str,
int,
bool,
datetime,
Timestamp,
Decimal128,
list,
dict,
mixed
]
START_DATE = datetime(2024, 1, 1, tzinfo=timezone.utc)
END_DATE = datetime(2025, 12, 31, tzinfo=timezone.utc)
# Ideally we would want to seed our uncorrelated Faker based on the --seed argument to driver.py
# but it is not available here.
ufkr = Faker()
ufkr.seed_instance(1)
universal_generators = {
'missing' : lambda fkr: MISSING,
'null' : lambda fkr: None,
}
def pareto(fkr) -> int:
"""In the absence of a Zipfian implementation to generate skewed datasets, we use pareto"""
return int(fkr.random.paretovariate(2))
def lambda_sources(l: Specification) -> str:
"""Returns the code of the lambdas that participate in generating the values of a Specification."""
signature = inspect.signature(l.source)
params = list(signature.parameters.values())
return "\n".join(
f"{probability:>4.0%} {inspect.getsource(generator).strip()}"
for generator, probability in params[1].default.items()
)
type_generators: dict[type, dict[str, Callable]] = {
str: {
'p1' : lambda fkr: ascii_lowercase[min(25, pareto(fkr) % 26)],
's1' : lambda fkr: fkr.pystr(min_chars=1, max_chars=1),
's2' : lambda fkr: fkr.pystr(min_chars=1, max_chars=2),
's4' : lambda fkr: fkr.pystr(min_chars=1, max_chars=4),
},
int: {
'const1': lambda fkr: 1,
'i10': lambda fkr: fkr.random_int(min=1, max=10),
'i100': lambda fkr: fkr.random_int(min=1, max=100),
'i1000': lambda fkr: fkr.random_int(min=1, max=1000),
'i10000': lambda fkr: fkr.random_int(min=1, max=10000),
'i100000': lambda fkr: fkr.random_int(min=1, max=100000),
'pareto': pareto
},
bool: {
'br': lambda fkr: fkr.boolean(),
'b10': lambda fkr: fkr.boolean(10),
'b100': lambda fkr: fkr.boolean(1),
'b1000': lambda fkr: fkr.boolean(0.1),
'b10000': lambda fkr: fkr.boolean(0.01),
'b100000': lambda fkr: fkr.boolean(0.001),
},
datetime: {
'dt_pareto': lambda fkr: START_DATE + timedelta(days=pareto(fkr)),
},
Timestamp: {
# Note that we can not generate timestamps with i > 0 as the i is not preserved in the .schema file
'ts_const': lambda fkr: Timestamp(fkr.random_element([START_DATE, END_DATE]), 0),
'ts_triangular': lambda fkr: Timestamp(fkr.random.triangular(START_DATE, END_DATE, END_DATE), 0)
},
Decimal128: {
'decimal_pareto': lambda fkr: Decimal128(f"{pareto(fkr)}.{pareto(fkr)}")
},
list: {
'list_int_pareto': lambda fkr: [pareto(fkr) for _ in range(pareto(fkr) % 10)],
'list_str_pareto': lambda fkr: [ascii_lowercase[min(25, pareto(fkr) % 26)] for _ in range(pareto(fkr) % 10)],
},
dict: {
'dict_str_pareto': lambda fkr: {ascii_lowercase[min(25, pareto(fkr) % 26)]: pareto(fkr) for _ in range(pareto(fkr) % 10)}
}
}
specifications = {}
offset = ufkr.random_int(min=0, max=len(AVAILABLE_TYPES))
for f in range(NUM_FIELDS):
# Ensure fairness by iterating through the available types,
# instead of picking a random field type each time. At the same
# time, the different starting `offset` provides a measure of randomness.
chosen_type: type = AVAILABLE_TYPES[(offset + f) % len(AVAILABLE_TYPES)]
if chosen_type is mixed:
available_generators = [
generator for type in type_generators.values()
for generator in type.values()
]
else:
available_generators = list(type_generators[chosen_type].values())
# Add some nulls, missing and the like
if ufkr.boolean(25):
available_generators.extend(universal_generators.values())
chosen_generators: dict[Callable, float] = OrderedDict()
generator_count = ufkr.random_int(min=2, max=4)
# Pick the set of generators that will be used for the given field
for g in range(generator_count):
generator = ufkr.random_element(available_generators)
if g < generator_count - 1:
probability = ufkr.random.uniform(0.0, 0.2)
else:
# The final generator receives the remaining weight
# to arrive to a total weight of 1.
probability = 1 - sum(chosen_generators.values())
chosen_generators[generator] = probability
chosen_correlation = ufkr.random_element(CORRELATIONS)
# We use the 'default argument value' trick to capture the chosen generators
# and make them available to the actual value generation.
def source(fkr, generators=chosen_generators):
chosen_generator = fkr.random_element(generators)
return chosen_generator(fkr)
specification = Specification(chosen_type,
correlation=chosen_correlation,
source=source)
# pylint: disable=invalid-name
field_name = f"field{f}_{chosen_type.__name__}"
# Have 75% of the fields have an index, which we signify by
# appending an _idx suffix to the field name.
if f < NUM_FIELDS * 0.75:
field_name = field_name + "_idx"
specifications[field_name] = specification
for field_name, specification in specifications.items():
print(f"Field {field_name} with type {specification.type.__name__}:")
print(lambda_sources(specification))
print()
# Convert the dictionary into a dataclass that driver.py can then use.
plan_stability2 = dataclasses.make_dataclass(
"plan_stability2", # Name of the dataclass
specifications.items()
)
def indexes() -> list[pymongo.IndexModel]:
"""Return a set of pymongo.IndexModel objects that the data generator will create."""
indexed_fields = [
field_name for field_name in specifications if "idx" in field_name
]
assert len(indexed_fields) > 0
chosen_indexes: dict[str, pymongo.IndexModel] = {}
for indexed_field in indexed_fields:
# The first field of each index is one of the fields we definitely
# want to be indexed ...
chosen_fields: dict[str, int] = {
indexed_field:
ufkr.random_element([pymongo.ASCENDING, pymongo.DESCENDING])
}
# ... and we will make some indexes multi-field by tacking on more fields.
secondary_field_count = round(ufkr.random.triangular(low=0, high=2,
mode=0))
for _ in range(secondary_field_count):
secondary_field = ufkr.random_element(indexed_fields)
if secondary_field in chosen_fields:
continue
has_array_field = any("mixed" in f or "list" in f
for f in chosen_fields)
if ("mixed" in secondary_field
or "list" in secondary_field) and has_array_field >= 1:
# We can not have two array fields in a compound index
continue
secondary_dir = ufkr.random_element(
[pymongo.ASCENDING, pymongo.DESCENDING])
chosen_fields[secondary_field] = secondary_dir
chosen_index = pymongo.IndexModel(keys=list(chosen_fields.items()))
print(chosen_index)
chosen_indexes[str(chosen_index)] = chosen_index
return list(chosen_indexes.values())