mongo/buildscripts/cost_model/execution_tree.py

173 lines
6.5 KiB
Python

# Copyright (C) 2022-present MongoDB, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Server Side Public License, version 1,
# as published by MongoDB, Inc.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Server Side Public License for more details.
#
# You should have received a copy of the Server Side Public License
# along with this program. If not, see
# <http://www.mongodb.com/licensing/server-side-public-license>.
#
# As a special exception, the copyright holders give permission to link the
# code of portions of this program with the OpenSSL library under certain
# conditions as described in each individual source file and distribute
# linked combinations including the program with the OpenSSL library. You
# must comply with the Server Side Public License in all respects for
# all of the code used other than as permitted herein. If you modify file(s)
# with this exception, you may extend this exception to your version of the
# file(s), but you are not obligated to do so. If you do not wish to do so,
# delete this exception statement from your version. If you delete this
# exception statement from all source files in the program, then also delete
# it in the license file.
#
"""Define SBE execution tree and parse it from query explain."""
from __future__ import annotations
from dataclasses import dataclass
import bson.json_util as json
__all__ = ['Node', 'build_execution_tree']
@dataclass
class Node:
"""Represent SBE tree node."""
stage: str
plan_node_id: int
total_execution_time: int
n_returned: int
n_processed: int
children: list[Node]
def get_execution_time(self):
"""Execution time of the SBE node without execuion time of its children."""
return self.total_execution_time - sum(n.total_execution_time for n in self.children)
def print(self, level=0):
"""Pretty print of the SBE tree."""
print(
f'{"| "*level}{self.stage}, plaNodeId: {self.plan_node_id}, totalExecutionTime: {self.total_execution_time:,}, nReturned: {self.n_returned}, nProcessed: {self.n_processed}'
)
for child in self.children:
child.print(level + 1)
def build_execution_tree(execution_stats: dict[str, any]) -> Node:
"""Build SBE executioon tree from 'executionStats' field of query explain."""
assert execution_stats['executionSuccess']
return process_stage(execution_stats['executionStages'])
def process_stage(stage: dict[str, any]) -> Node:
"""Parse the given SBE stage."""
processors = {
'filter': process_filter,
'cfilter': process_filter,
'traverse': process_traverse,
'project': process_inner_node,
'limit': process_inner_node,
'scan': process_seek,
'coscan': process_leaf_node,
'nlj': process_nlj,
'hj': process_hash_join_node,
'mj': process_hash_join_node,
'seek': process_seek,
'ixseek': process_seek,
'limitskip': process_inner_node,
'group': process_inner_node,
'union': process_union_node,
'unique': process_unique_node,
'unwind': process_unwind_node,
}
processor = processors.get(stage['stage'])
if processor is None:
print(json.dumps(stage, indent=4))
raise ValueError(f'Unknown stage: {stage}')
return processor(stage)
def process_filter(stage: dict[str, any]) -> Node:
"""Process filter stage."""
input_stage = process_stage(stage['inputStage'])
return Node(**get_common_fields(stage), n_processed=stage['numTested'], children=[input_stage])
def process_traverse(stage: dict[str, any]) -> Node:
"""Process traverse, not used by Bonsai."""
outer_stage = process_stage(stage['outerStage'])
inner_stage = process_stage(stage['innerStage'])
return Node(**get_common_fields(stage), n_processed=stage['nReturned'],
children=[outer_stage, inner_stage])
def process_hash_join_node(stage: dict[str, any]) -> Node:
"""Process hj node."""
outer_stage = process_stage(stage['outerStage'])
inner_stage = process_stage(stage['innerStage'])
n_processed = outer_stage.n_returned + inner_stage.n_returned
return Node(**get_common_fields(stage), n_processed=n_processed,
children=[outer_stage, inner_stage])
def process_nlj(stage: dict[str, any]) -> Node:
"""Process nlj stage."""
outer_stage = process_stage(stage['outerStage'])
inner_stage = process_stage(stage['innerStage'])
n_processed = stage['totalDocsExamined']
return Node(**get_common_fields(stage), n_processed=n_processed,
children=[outer_stage, inner_stage])
def process_inner_node(stage: dict[str, any]) -> Node:
"""Process SBE stage with one input stage."""
input_stage = process_stage(stage['inputStage'])
return Node(**get_common_fields(stage), n_processed=input_stage.n_returned,
children=[input_stage])
def process_leaf_node(stage: dict[str, any]) -> Node:
"""Process SBE stage without input stages."""
return Node(**get_common_fields(stage), n_processed=stage['nReturned'], children=[])
def process_seek(stage: dict[str, any]) -> Node:
"""Process seek stage."""
return Node(**get_common_fields(stage), n_processed=stage['numReads'], children=[])
def process_union_node(stage: dict[str, any]) -> Node:
"""Process union stage."""
children = [process_stage(child) for child in stage['inputStages']]
return Node(**get_common_fields(stage), n_processed=stage['nReturned'], children=children)
def process_unwind_node(stage: dict[str, any]) -> Node:
"""Process unwind stage."""
input_stage = process_stage(stage['inputStage'])
return Node(**get_common_fields(stage), n_processed=input_stage.n_returned,
children=[input_stage])
def process_unique_node(stage: dict[str, any]) -> Node:
"""Process unique stage."""
input_stage = process_stage(stage['inputStage'])
n_processed = stage['dupsTested']
return Node(**get_common_fields(stage), n_processed=n_processed, children=[input_stage])
def get_common_fields(json_stage: dict[str, any]) -> dict[str, any]:
"""Exctract common field from json representation of SBE stage."""
return {
'stage': json_stage['stage'], 'plan_node_id': json_stage['planNodeId'],
'total_execution_time': json_stage['executionTimeNanos'],
'n_returned': json_stage['nReturned']
}