SERVER-107786 Create workload for calibration of COLLSCAN node (#38773)

Co-authored-by: Militsa Sotirova <militsa.sotirova@mongodb.com>
GitOrigin-RevId: 512ed28ad6119f652e433c4f5622e8c747cf4fd4
This commit is contained in:
Asher Kornfeld 2025-07-23 14:58:46 -04:00 committed by MongoDB Bot
parent 11dd69f925
commit 8dc33a4aa1
5 changed files with 51 additions and 49 deletions

View File

@ -195,7 +195,7 @@ def create_index_scan_collection_template(name: str, cardinality: int) -> config
)
def create_physical_scan_collection_template(
def create_coll_scan_collection_template(
name: str, payload_size: int = 0
) -> config.CollectionTemplate:
template = config.CollectionTemplate(
@ -305,14 +305,14 @@ c_arr_01 = config.CollectionTemplate(
index_scan = create_index_scan_collection_template("index_scan", 1000000)
physical_scan = create_physical_scan_collection_template("physical_scan", 2000)
coll_scan = create_coll_scan_collection_template("coll_scan", 2000)
# Data Generator settings
data_generator = config.DataGeneratorConfig(
enabled=True,
create_indexes=True,
batch_size=10000,
collection_templates=[index_scan, physical_scan, c_int_05, c_arr_01],
collection_templates=[index_scan, coll_scan, c_int_05, c_arr_01],
write_mode=config.WriteMode.REPLACE,
collection_name_with_card=True,
)
@ -346,11 +346,16 @@ qsn_nodes = [
config.QsNodeCalibrationConfig(type="SORT_MERGE"),
config.QsNodeCalibrationConfig(type="SORT"),
config.QsNodeCalibrationConfig(type="LIMIT"),
config.QsNodeCalibrationConfig(type="SKIP", variables_override=lambda df: pd.concat([
df['n_returned'].rename("Documents Passed"),
(df['n_processed'] - df['n_returned']).rename('Documents Skipped')],
axis=1
)),
config.QsNodeCalibrationConfig(
type="SKIP",
variables_override=lambda df: pd.concat(
[
df["n_returned"].rename("Documents Passed"),
(df["n_processed"] - df["n_returned"]).rename("Documents Skipped"),
],
axis=1,
),
),
]
# Calibrator settings
qs_calibrator = config.QuerySolutionCalibrationConfig(

View File

@ -36,9 +36,9 @@ from typing import Any, Mapping, NewType, Sequence
from config import DatabaseConfig, RestoreMode
from pymongo import AsyncMongoClient
__all__ = ["DatabaseInstance", "Pipeline"]
__all__ = ["DatabaseInstance", "Find"]
"""MongoDB Aggregate's Pipeline"""
Pipeline = NewType("Pipeline", Sequence[Mapping[str, Any]])
Find = NewType("Find", Mapping[str, Any])
class DatabaseInstance:
@ -91,11 +91,11 @@ class DatabaseInstance:
"internalQueryFrameworkControl", "trySbeEngine" if state else "forceClassicEngine"
)
async def explain(self, collection_name: str, pipeline: Pipeline) -> dict[str, any]:
"""Return explain for the given pipeline."""
async def explain(self, collection_name: str, find: Find) -> dict[str, any]:
"""Return explain for the given find command."""
return await self.database.command(
"explain",
{"aggregate": collection_name, "pipeline": pipeline, "cursor": {}},
{"find": collection_name, **find},
verbosity="executionStats",
)

View File

@ -133,7 +133,7 @@ def remove_outliers(
return (df_seq >= low) & (df_seq <= high)
return df[
df.groupby(["run_id", "collection", "pipeline"])
df.groupby(["run_id", "collection", "command"])
.total_execution_time.transform(is_not_outlier)
.eq(1)
]
@ -182,7 +182,7 @@ def extract_qsn_nodes(df: pd.DataFrame) -> pd.DataFrame:
**dataclasses.asdict(stat),
**json.loads(df_seq["query_parameters"]),
"run_id": df_seq.run_id,
"pipeline": df_seq.pipeline,
"command": df_seq.command,
"source": df_seq.name,
}
rows.append(row)

View File

@ -92,30 +92,6 @@ async def execute_index_scan_queries(
)
async def execute_physical_scan_queries(
database: DatabaseInstance, collections: Sequence[CollectionInfo]
):
collections = [ci for ci in collections if ci.name.startswith("physical_scan")]
fields = [f for f in collections[0].fields if f.name == "choice"]
requests = []
for field in fields:
for val in field.distribution.get_values()[::3]:
if val.startswith("_"):
continue
keys_length = len(val) + 2
requests.append(
Query(
pipeline=[{"$match": {field.name: val}}, {"$limit": 10}],
keys_length_in_bytes=keys_length,
note="PhysicalScan",
)
)
await workload_execution.execute(
database, main_config.workload_execution, collections, requests
)
async def execute_index_intersections_with_requests(
database: DatabaseInstance, collections: Sequence[CollectionInfo], requests: Sequence[Query]
):
@ -158,11 +134,28 @@ async def execute_index_intersections(
await execute_index_intersections_with_requests(database, collections, requests)
async def execute_collection_scans(
database: DatabaseInstance, collections: Sequence[CollectionInfo], forwards=True
):
collections = [c for c in collections if c.name.startswith("coll_scan")]
# Even though these numbers are not representative of the way COLLSCANs are usually used,
# we can use them for calibration based on the assumption that the cost scales linearly.
limits = [5, 10, 50, 75, 100, 150, 300, 500, 1000]
direction = 1 if forwards else -1
requests = [
Query({"limit": limit, "sort": {"$natural": direction}}, note="COLLSCAN")
for limit in limits
]
await workload_execution.execute(
database, main_config.workload_execution, collections, requests
)
async def execute_limits(database: DatabaseInstance, collections: Sequence[CollectionInfo]):
collection = [c for c in collections if c.name.startswith("index_scan")][0]
limits = [1, 2, 5, 10, 15, 20, 25, 50] # , 100, 250, 500, 1000, 2500, 5000, 10000]
requests = [Query([{"$limit": limit}], note="LIMIT") for limit in limits]
requests = [Query({"limit": limit}, note="LIMIT") for limit in limits]
await workload_execution.execute(
database, main_config.workload_execution, [collection], requests
)
@ -176,7 +169,7 @@ async def execute_skips(database: DatabaseInstance, collections: Sequence[Collec
# We add a LIMIT on top of the SKIP in order to easily vary the number of processed documents.
for limit in limits:
for skip in skips:
requests.append(Query(pipeline=[{"$skip": skip}, {"$limit": limit}], note="SKIP"))
requests.append(Query(find_cmd={"skip": skip, "limit": limit}, note="SKIP"))
await workload_execution.execute(
database, main_config.workload_execution, [collection], requests
)
@ -195,7 +188,11 @@ async def main():
await generator.populate_collections()
# 3. Collecting data for calibration (optional).
# It runs the pipelines and stores explains to the database.
execution_query_functions = [execute_limits, execute_skips]
# NOTE: you must run the collection scan workload twice, once to get the coefficients for a forward scan,
# and another for backwards ones. To toggle this, change the argument 'forwards' in the signature of
# 'execute_collection_scans'. We need to do this as otherwise data from both directions will be used
# for the same calibration, which we explicitly want to avoid.
execution_query_functions = [execute_collection_scans, execute_limits, execute_skips]
for execute_query in execution_query_functions:
await execute_query(database, generator.collection_infos)
main_config.workload_execution.write_mode = WriteMode.APPEND

View File

@ -36,16 +36,16 @@ import bson.json_util as json
from bson.objectid import ObjectId
from config import WorkloadExecutionConfig, WriteMode
from data_generator import CollectionInfo
from database_instance import DatabaseInstance, Pipeline
from database_instance import DatabaseInstance, Find
__all__ = ["execute"]
@dataclass
class Query:
"""Query pipleline and related model input parameters."""
"""Query command and related model input parameters."""
pipeline: Pipeline
find_cmd: Find
keys_length_in_bytes: int = 0
number_of_fields: int = 0
note: any = None
@ -105,7 +105,7 @@ class WorkloadExecution:
for coll_info in collection_infos:
print(f"\n>>>>> running queries on collection {coll_info.name}")
for query in queries:
print(f">>>>>>> running query {query.pipeline}")
print(f">>>>>>> running query {query.find_cmd}")
await self._run_query(coll_info, query, measurements)
await self.database.insert_many(self.config.output_collection_name, measurements)
@ -113,7 +113,7 @@ class WorkloadExecution:
async def _run_query(self, coll_info: CollectionInfo, query: Query, result: Sequence):
# warm up
for _ in range(self.config.warmup_runs):
await self.database.explain(coll_info.name, query.pipeline)
await self.database.explain(coll_info.name, query.find_cmd)
run_id = ObjectId()
avg_doc_size = await self.database.get_average_document_size(coll_info.name)
@ -124,13 +124,13 @@ class WorkloadExecution:
note=query.note,
)
for _ in range(self.config.runs):
explain = await self.database.explain(coll_info.name, query.pipeline)
explain = await self.database.explain(coll_info.name, query.find_cmd)
if explain["ok"] == 1:
result.append(
{
"run_id": run_id,
"collection": coll_info.name,
"pipeline": json.dumps(query.pipeline),
"command": json.dumps(query.find_cmd),
"explain": json.dumps(explain),
"query_parameters": parameters.to_json(),
}