diff --git a/buildscripts/cost_model/calibration_settings.py b/buildscripts/cost_model/calibration_settings.py index 645881e9518..739dfea45a2 100644 --- a/buildscripts/cost_model/calibration_settings.py +++ b/buildscripts/cost_model/calibration_settings.py @@ -323,7 +323,7 @@ workload_execution = config.WorkloadExecutionConfig( output_collection_name="calibrationData", write_mode=config.WriteMode.REPLACE, warmup_runs=5, - runs=75, + runs=100, ) @@ -356,6 +356,9 @@ qsn_nodes = [ axis=1, ), ), + config.QsNodeCalibrationConfig(type="PROJECTION_SIMPLE"), + config.QsNodeCalibrationConfig(type="PROJECTION_COVERED"), + config.QsNodeCalibrationConfig(type="PROJECTION_DEFAULT"), ] # Calibrator settings qs_calibrator = config.QuerySolutionCalibrationConfig( diff --git a/buildscripts/cost_model/execution_tree_classic.py b/buildscripts/cost_model/execution_tree_classic.py index 06ffceb970b..5dd0bac51c3 100644 --- a/buildscripts/cost_model/execution_tree_classic.py +++ b/buildscripts/cost_model/execution_tree_classic.py @@ -69,7 +69,7 @@ def build_execution_tree(execution_stats: dict[str, Any]) -> Node: def process_stage(stage: dict[str, Any]) -> Node: """Parse the given execution stage""" processors = { - "SUBPLAN": process_subplan, + "SUBPLAN": process_passthrough, "COLLSCAN": process_collscan, "IXSCAN": process_ixscan, "FETCH": process_fetch, @@ -79,8 +79,11 @@ def process_stage(stage: dict[str, Any]) -> Node: "MERGE_SORT": process_mergesort, "SORT_MERGE": process_mergesort, "SORT": process_sort, - "LIMIT": process_limit, + "LIMIT": process_passthrough, "SKIP": process_skip, + "PROJECTION_SIMPLE": process_passthrough, + "PROJECTION_COVERED": process_passthrough, + "PROJECTION_DEFAULT": process_passthrough, } processor = processors.get(stage["stage"]) if processor is None: @@ -90,7 +93,8 @@ def process_stage(stage: dict[str, Any]) -> Node: return processor(stage) -def process_subplan(stage: dict[str, Any]) -> Node: +def process_passthrough(stage: dict[str, Any]) -> Node: + """Parse internal (non-leaf) execution stages with a single child, which process exactly the documents that they return.""" input_stage = process_stage(stage["inputStage"]) return Node(**get_common_fields(stage), n_processed=stage["nReturned"], children=[input_stage]) @@ -103,6 +107,13 @@ def process_ixscan(stage: dict[str, Any]) -> Node: return Node(**get_common_fields(stage), n_processed=stage["keysExamined"], children=[]) +def process_sort(stage: dict[str, Any]) -> Node: + input_stage = process_stage(stage["inputStage"]) + return Node( + **get_common_fields(stage), n_processed=input_stage.n_returned, children=[input_stage] + ) + + def process_fetch(stage: dict[str, Any]) -> Node: input_stage = process_stage(stage["inputStage"]) return Node( @@ -126,16 +137,6 @@ def process_mergesort(stage: dict[str, Any]) -> Node: return Node(**get_common_fields(stage), n_processed=stage["nReturned"], children=children) -def process_sort(stage: dict[str, Any]) -> Node: - input_stage = process_stage(stage["inputStage"]) - return Node(**get_common_fields(stage), n_processed=stage["nReturned"], children=[input_stage]) - - -def process_limit(stage: dict[str, Any]) -> Node: - input_stage = process_stage(stage["inputStage"]) - return Node(**get_common_fields(stage), n_processed=stage["nReturned"], children=[input_stage]) - - def process_skip(stage: dict[str, Any]) -> Node: input_stage = process_stage(stage["inputStage"]) # This is different than the limit processor since the skip node processes both the documents it skips and the ones it passes up. diff --git a/buildscripts/cost_model/start.py b/buildscripts/cost_model/start.py index 333a53e2105..366e9c6638d 100644 --- a/buildscripts/cost_model/start.py +++ b/buildscripts/cost_model/start.py @@ -37,7 +37,7 @@ import parameters_extractor_classic import qsn_calibrator import workload_execution from calibration_settings import main_config -from config import WriteMode +from config import DataType, WriteMode from cost_estimator import CostModelParameters, ExecutionStats from data_generator import CollectionInfo, DataGenerator from database_instance import DatabaseInstance @@ -175,6 +175,45 @@ async def execute_skips(database: DatabaseInstance, collections: Sequence[Collec ) +async def execute_projections(database: DatabaseInstance, collections: Sequence[CollectionInfo]): + collection = [c for c in collections if c.name.startswith("c_int_05_30")][0] + limits = [5, 10, 50, 75, 100, 150, 300, 500, 1000] + # We calibrate using projections on the last field since this means the node does a nontrivial amount of work. + # This is because non-covered projections iterate over the fields in a given document as part of its work. + field = collection.fields[-1] + requests = [] + # Simple projections, these do not contain any computed fields and are not fully covered by an index. + for limit in limits: + requests.append( + Query({"limit": limit, "projection": {field.name: 1}}, note="PROJECTION_SIMPLE") + ) + + # Covered projections, these are inclusions that are fully covered by an index. + field = [f for f in collection.fields if f.indexed][-1] + for limit in limits: + requests.append( + Query( + {"limit": limit, "projection": {"_id": 0, field.name: 1}, "hint": {field.name: 1}}, + note="PROJECTION_COVERED", + ) + ) + + # Default projections, these are the only ones that can handle computed projections, + # so that is how we calibrate them. We assume that the computation will be constant across + # the enumerated plans and thus keep it very simple. + fields = [f for f in collection.fields if f.type == DataType.INTEGER] + for limit in limits: + requests.append( + Query( + {"limit": limit, "projection": {"out": {"$add": [f"${f.name}" for f in fields]}}}, + note="PROJECTION_DEFAULT", + ) + ) + await workload_execution.execute( + database, main_config.workload_execution, [collection], requests + ) + + async def main(): """Entry point function.""" script_directory = os.path.abspath(os.path.dirname(__file__)) @@ -192,7 +231,12 @@ async def main(): # and another for backwards ones. To toggle this, change the argument 'forwards' in the signature of # 'execute_collection_scans'. We need to do this as otherwise data from both directions will be used # for the same calibration, which we explicitly want to avoid. - execution_query_functions = [execute_collection_scans, execute_limits, execute_skips] + execution_query_functions = [ + execute_projections, + execute_collection_scans, + execute_limits, + execute_skips, + ] for execute_query in execution_query_functions: await execute_query(database, generator.collection_infos) main_config.workload_execution.write_mode = WriteMode.APPEND