From 8dc33a4aa10cf18e7d67791072e6603e285f077d Mon Sep 17 00:00:00 2001 From: Asher Kornfeld Date: Wed, 23 Jul 2025 14:58:46 -0400 Subject: [PATCH] SERVER-107786 Create workload for calibration of COLLSCAN node (#38773) Co-authored-by: Militsa Sotirova GitOrigin-RevId: 512ed28ad6119f652e433c4f5622e8c747cf4fd4 --- .../cost_model/calibration_settings.py | 21 +++++--- buildscripts/cost_model/database_instance.py | 10 ++-- buildscripts/cost_model/experiment.py | 4 +- buildscripts/cost_model/start.py | 51 +++++++++---------- buildscripts/cost_model/workload_execution.py | 14 ++--- 5 files changed, 51 insertions(+), 49 deletions(-) diff --git a/buildscripts/cost_model/calibration_settings.py b/buildscripts/cost_model/calibration_settings.py index 924b5feac1b..645881e9518 100644 --- a/buildscripts/cost_model/calibration_settings.py +++ b/buildscripts/cost_model/calibration_settings.py @@ -195,7 +195,7 @@ def create_index_scan_collection_template(name: str, cardinality: int) -> config ) -def create_physical_scan_collection_template( +def create_coll_scan_collection_template( name: str, payload_size: int = 0 ) -> config.CollectionTemplate: template = config.CollectionTemplate( @@ -305,14 +305,14 @@ c_arr_01 = config.CollectionTemplate( index_scan = create_index_scan_collection_template("index_scan", 1000000) -physical_scan = create_physical_scan_collection_template("physical_scan", 2000) +coll_scan = create_coll_scan_collection_template("coll_scan", 2000) # Data Generator settings data_generator = config.DataGeneratorConfig( enabled=True, create_indexes=True, batch_size=10000, - collection_templates=[index_scan, physical_scan, c_int_05, c_arr_01], + collection_templates=[index_scan, coll_scan, c_int_05, c_arr_01], write_mode=config.WriteMode.REPLACE, collection_name_with_card=True, ) @@ -346,11 +346,16 @@ qsn_nodes = [ config.QsNodeCalibrationConfig(type="SORT_MERGE"), config.QsNodeCalibrationConfig(type="SORT"), config.QsNodeCalibrationConfig(type="LIMIT"), - config.QsNodeCalibrationConfig(type="SKIP", variables_override=lambda df: pd.concat([ - df['n_returned'].rename("Documents Passed"), - (df['n_processed'] - df['n_returned']).rename('Documents Skipped')], - axis=1 - )), + config.QsNodeCalibrationConfig( + type="SKIP", + variables_override=lambda df: pd.concat( + [ + df["n_returned"].rename("Documents Passed"), + (df["n_processed"] - df["n_returned"]).rename("Documents Skipped"), + ], + axis=1, + ), + ), ] # Calibrator settings qs_calibrator = config.QuerySolutionCalibrationConfig( diff --git a/buildscripts/cost_model/database_instance.py b/buildscripts/cost_model/database_instance.py index cc42912f34d..1e1a61aded5 100644 --- a/buildscripts/cost_model/database_instance.py +++ b/buildscripts/cost_model/database_instance.py @@ -36,9 +36,9 @@ from typing import Any, Mapping, NewType, Sequence from config import DatabaseConfig, RestoreMode from pymongo import AsyncMongoClient -__all__ = ["DatabaseInstance", "Pipeline"] +__all__ = ["DatabaseInstance", "Find"] """MongoDB Aggregate's Pipeline""" -Pipeline = NewType("Pipeline", Sequence[Mapping[str, Any]]) +Find = NewType("Find", Mapping[str, Any]) class DatabaseInstance: @@ -91,11 +91,11 @@ class DatabaseInstance: "internalQueryFrameworkControl", "trySbeEngine" if state else "forceClassicEngine" ) - async def explain(self, collection_name: str, pipeline: Pipeline) -> dict[str, any]: - """Return explain for the given pipeline.""" + async def explain(self, collection_name: str, find: Find) -> dict[str, any]: + """Return explain for the given find command.""" return await self.database.command( "explain", - {"aggregate": collection_name, "pipeline": pipeline, "cursor": {}}, + {"find": collection_name, **find}, verbosity="executionStats", ) diff --git a/buildscripts/cost_model/experiment.py b/buildscripts/cost_model/experiment.py index 57fa9f12f92..d0fd8550393 100644 --- a/buildscripts/cost_model/experiment.py +++ b/buildscripts/cost_model/experiment.py @@ -133,7 +133,7 @@ def remove_outliers( return (df_seq >= low) & (df_seq <= high) return df[ - df.groupby(["run_id", "collection", "pipeline"]) + df.groupby(["run_id", "collection", "command"]) .total_execution_time.transform(is_not_outlier) .eq(1) ] @@ -182,7 +182,7 @@ def extract_qsn_nodes(df: pd.DataFrame) -> pd.DataFrame: **dataclasses.asdict(stat), **json.loads(df_seq["query_parameters"]), "run_id": df_seq.run_id, - "pipeline": df_seq.pipeline, + "command": df_seq.command, "source": df_seq.name, } rows.append(row) diff --git a/buildscripts/cost_model/start.py b/buildscripts/cost_model/start.py index a76be83b9e8..333a53e2105 100644 --- a/buildscripts/cost_model/start.py +++ b/buildscripts/cost_model/start.py @@ -92,30 +92,6 @@ async def execute_index_scan_queries( ) -async def execute_physical_scan_queries( - database: DatabaseInstance, collections: Sequence[CollectionInfo] -): - collections = [ci for ci in collections if ci.name.startswith("physical_scan")] - fields = [f for f in collections[0].fields if f.name == "choice"] - requests = [] - for field in fields: - for val in field.distribution.get_values()[::3]: - if val.startswith("_"): - continue - keys_length = len(val) + 2 - requests.append( - Query( - pipeline=[{"$match": {field.name: val}}, {"$limit": 10}], - keys_length_in_bytes=keys_length, - note="PhysicalScan", - ) - ) - - await workload_execution.execute( - database, main_config.workload_execution, collections, requests - ) - - async def execute_index_intersections_with_requests( database: DatabaseInstance, collections: Sequence[CollectionInfo], requests: Sequence[Query] ): @@ -158,11 +134,28 @@ async def execute_index_intersections( await execute_index_intersections_with_requests(database, collections, requests) +async def execute_collection_scans( + database: DatabaseInstance, collections: Sequence[CollectionInfo], forwards=True +): + collections = [c for c in collections if c.name.startswith("coll_scan")] + # Even though these numbers are not representative of the way COLLSCANs are usually used, + # we can use them for calibration based on the assumption that the cost scales linearly. + limits = [5, 10, 50, 75, 100, 150, 300, 500, 1000] + direction = 1 if forwards else -1 + requests = [ + Query({"limit": limit, "sort": {"$natural": direction}}, note="COLLSCAN") + for limit in limits + ] + await workload_execution.execute( + database, main_config.workload_execution, collections, requests + ) + + async def execute_limits(database: DatabaseInstance, collections: Sequence[CollectionInfo]): collection = [c for c in collections if c.name.startswith("index_scan")][0] limits = [1, 2, 5, 10, 15, 20, 25, 50] # , 100, 250, 500, 1000, 2500, 5000, 10000] - requests = [Query([{"$limit": limit}], note="LIMIT") for limit in limits] + requests = [Query({"limit": limit}, note="LIMIT") for limit in limits] await workload_execution.execute( database, main_config.workload_execution, [collection], requests ) @@ -176,7 +169,7 @@ async def execute_skips(database: DatabaseInstance, collections: Sequence[Collec # We add a LIMIT on top of the SKIP in order to easily vary the number of processed documents. for limit in limits: for skip in skips: - requests.append(Query(pipeline=[{"$skip": skip}, {"$limit": limit}], note="SKIP")) + requests.append(Query(find_cmd={"skip": skip, "limit": limit}, note="SKIP")) await workload_execution.execute( database, main_config.workload_execution, [collection], requests ) @@ -195,7 +188,11 @@ async def main(): await generator.populate_collections() # 3. Collecting data for calibration (optional). # It runs the pipelines and stores explains to the database. - execution_query_functions = [execute_limits, execute_skips] + # NOTE: you must run the collection scan workload twice, once to get the coefficients for a forward scan, + # and another for backwards ones. To toggle this, change the argument 'forwards' in the signature of + # 'execute_collection_scans'. We need to do this as otherwise data from both directions will be used + # for the same calibration, which we explicitly want to avoid. + execution_query_functions = [execute_collection_scans, execute_limits, execute_skips] for execute_query in execution_query_functions: await execute_query(database, generator.collection_infos) main_config.workload_execution.write_mode = WriteMode.APPEND diff --git a/buildscripts/cost_model/workload_execution.py b/buildscripts/cost_model/workload_execution.py index b0684f4ff42..621193fe225 100644 --- a/buildscripts/cost_model/workload_execution.py +++ b/buildscripts/cost_model/workload_execution.py @@ -36,16 +36,16 @@ import bson.json_util as json from bson.objectid import ObjectId from config import WorkloadExecutionConfig, WriteMode from data_generator import CollectionInfo -from database_instance import DatabaseInstance, Pipeline +from database_instance import DatabaseInstance, Find __all__ = ["execute"] @dataclass class Query: - """Query pipleline and related model input parameters.""" + """Query command and related model input parameters.""" - pipeline: Pipeline + find_cmd: Find keys_length_in_bytes: int = 0 number_of_fields: int = 0 note: any = None @@ -105,7 +105,7 @@ class WorkloadExecution: for coll_info in collection_infos: print(f"\n>>>>> running queries on collection {coll_info.name}") for query in queries: - print(f">>>>>>> running query {query.pipeline}") + print(f">>>>>>> running query {query.find_cmd}") await self._run_query(coll_info, query, measurements) await self.database.insert_many(self.config.output_collection_name, measurements) @@ -113,7 +113,7 @@ class WorkloadExecution: async def _run_query(self, coll_info: CollectionInfo, query: Query, result: Sequence): # warm up for _ in range(self.config.warmup_runs): - await self.database.explain(coll_info.name, query.pipeline) + await self.database.explain(coll_info.name, query.find_cmd) run_id = ObjectId() avg_doc_size = await self.database.get_average_document_size(coll_info.name) @@ -124,13 +124,13 @@ class WorkloadExecution: note=query.note, ) for _ in range(self.config.runs): - explain = await self.database.explain(coll_info.name, query.pipeline) + explain = await self.database.explain(coll_info.name, query.find_cmd) if explain["ok"] == 1: result.append( { "run_id": run_id, "collection": coll_info.name, - "pipeline": json.dumps(query.pipeline), + "command": json.dumps(query.find_cmd), "explain": json.dumps(explain), "query_parameters": parameters.to_json(), }