mirror of https://github.com/mongodb/mongo
173 lines
6.5 KiB
Python
173 lines
6.5 KiB
Python
# Copyright (C) 2022-present MongoDB, Inc.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the Server Side Public License, version 1,
|
|
# as published by MongoDB, Inc.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# Server Side Public License for more details.
|
|
#
|
|
# You should have received a copy of the Server Side Public License
|
|
# along with this program. If not, see
|
|
# <http://www.mongodb.com/licensing/server-side-public-license>.
|
|
#
|
|
# As a special exception, the copyright holders give permission to link the
|
|
# code of portions of this program with the OpenSSL library under certain
|
|
# conditions as described in each individual source file and distribute
|
|
# linked combinations including the program with the OpenSSL library. You
|
|
# must comply with the Server Side Public License in all respects for
|
|
# all of the code used other than as permitted herein. If you modify file(s)
|
|
# with this exception, you may extend this exception to your version of the
|
|
# file(s), but you are not obligated to do so. If you do not wish to do so,
|
|
# delete this exception statement from your version. If you delete this
|
|
# exception statement from all source files in the program, then also delete
|
|
# it in the license file.
|
|
#
|
|
"""Define SBE execution tree and parse it from query explain."""
|
|
|
|
from __future__ import annotations
|
|
from dataclasses import dataclass
|
|
import bson.json_util as json
|
|
|
|
__all__ = ['Node', 'build_execution_tree']
|
|
|
|
|
|
@dataclass
|
|
class Node:
|
|
"""Represent SBE tree node."""
|
|
|
|
stage: str
|
|
plan_node_id: int
|
|
total_execution_time: int
|
|
n_returned: int
|
|
n_processed: int
|
|
children: list[Node]
|
|
|
|
def get_execution_time(self):
|
|
"""Execution time of the SBE node without execuion time of its children."""
|
|
return self.total_execution_time - sum(n.total_execution_time for n in self.children)
|
|
|
|
def print(self, level=0):
|
|
"""Pretty print of the SBE tree."""
|
|
print(
|
|
f'{"| "*level}{self.stage}, plaNodeId: {self.plan_node_id}, totalExecutionTime: {self.total_execution_time:,}, nReturned: {self.n_returned}, nProcessed: {self.n_processed}'
|
|
)
|
|
for child in self.children:
|
|
child.print(level + 1)
|
|
|
|
|
|
def build_execution_tree(execution_stats: dict[str, any]) -> Node:
|
|
"""Build SBE executioon tree from 'executionStats' field of query explain."""
|
|
assert execution_stats['executionSuccess']
|
|
return process_stage(execution_stats['executionStages'])
|
|
|
|
|
|
def process_stage(stage: dict[str, any]) -> Node:
|
|
"""Parse the given SBE stage."""
|
|
processors = {
|
|
'filter': process_filter,
|
|
'cfilter': process_filter,
|
|
'traverse': process_traverse,
|
|
'project': process_inner_node,
|
|
'limit': process_inner_node,
|
|
'scan': process_seek,
|
|
'coscan': process_leaf_node,
|
|
'nlj': process_nlj,
|
|
'hj': process_hash_join_node,
|
|
'mj': process_hash_join_node,
|
|
'seek': process_seek,
|
|
'ixseek': process_seek,
|
|
'limitskip': process_inner_node,
|
|
'group': process_inner_node,
|
|
'union': process_union_node,
|
|
'unique': process_unique_node,
|
|
'unwind': process_unwind_node,
|
|
}
|
|
|
|
processor = processors.get(stage['stage'])
|
|
if processor is None:
|
|
print(json.dumps(stage, indent=4))
|
|
raise ValueError(f'Unknown stage: {stage}')
|
|
|
|
return processor(stage)
|
|
|
|
|
|
def process_filter(stage: dict[str, any]) -> Node:
|
|
"""Process filter stage."""
|
|
input_stage = process_stage(stage['inputStage'])
|
|
return Node(**get_common_fields(stage), n_processed=stage['numTested'], children=[input_stage])
|
|
|
|
|
|
def process_traverse(stage: dict[str, any]) -> Node:
|
|
"""Process traverse, not used by Bonsai."""
|
|
outer_stage = process_stage(stage['outerStage'])
|
|
inner_stage = process_stage(stage['innerStage'])
|
|
return Node(**get_common_fields(stage), n_processed=stage['nReturned'],
|
|
children=[outer_stage, inner_stage])
|
|
|
|
|
|
def process_hash_join_node(stage: dict[str, any]) -> Node:
|
|
"""Process hj node."""
|
|
outer_stage = process_stage(stage['outerStage'])
|
|
inner_stage = process_stage(stage['innerStage'])
|
|
n_processed = outer_stage.n_returned + inner_stage.n_returned
|
|
return Node(**get_common_fields(stage), n_processed=n_processed,
|
|
children=[outer_stage, inner_stage])
|
|
|
|
|
|
def process_nlj(stage: dict[str, any]) -> Node:
|
|
"""Process nlj stage."""
|
|
outer_stage = process_stage(stage['outerStage'])
|
|
inner_stage = process_stage(stage['innerStage'])
|
|
n_processed = stage['totalDocsExamined']
|
|
return Node(**get_common_fields(stage), n_processed=n_processed,
|
|
children=[outer_stage, inner_stage])
|
|
|
|
|
|
def process_inner_node(stage: dict[str, any]) -> Node:
|
|
"""Process SBE stage with one input stage."""
|
|
input_stage = process_stage(stage['inputStage'])
|
|
return Node(**get_common_fields(stage), n_processed=input_stage.n_returned,
|
|
children=[input_stage])
|
|
|
|
|
|
def process_leaf_node(stage: dict[str, any]) -> Node:
|
|
"""Process SBE stage without input stages."""
|
|
return Node(**get_common_fields(stage), n_processed=stage['nReturned'], children=[])
|
|
|
|
|
|
def process_seek(stage: dict[str, any]) -> Node:
|
|
"""Process seek stage."""
|
|
return Node(**get_common_fields(stage), n_processed=stage['numReads'], children=[])
|
|
|
|
|
|
def process_union_node(stage: dict[str, any]) -> Node:
|
|
"""Process union stage."""
|
|
children = [process_stage(child) for child in stage['inputStages']]
|
|
return Node(**get_common_fields(stage), n_processed=stage['nReturned'], children=children)
|
|
|
|
|
|
def process_unwind_node(stage: dict[str, any]) -> Node:
|
|
"""Process unwind stage."""
|
|
input_stage = process_stage(stage['inputStage'])
|
|
return Node(**get_common_fields(stage), n_processed=input_stage.n_returned,
|
|
children=[input_stage])
|
|
|
|
|
|
def process_unique_node(stage: dict[str, any]) -> Node:
|
|
"""Process unique stage."""
|
|
input_stage = process_stage(stage['inputStage'])
|
|
n_processed = stage['dupsTested']
|
|
return Node(**get_common_fields(stage), n_processed=n_processed, children=[input_stage])
|
|
|
|
|
|
def get_common_fields(json_stage: dict[str, any]) -> dict[str, any]:
|
|
"""Exctract common field from json representation of SBE stage."""
|
|
return {
|
|
'stage': json_stage['stage'], 'plan_node_id': json_stage['planNodeId'],
|
|
'total_execution_time': json_stage['executionTimeNanos'],
|
|
'n_returned': json_stage['nReturned']
|
|
}
|