mongo/buildscripts/task_generation/suite_split.py

361 lines
14 KiB
Python

"""Tools for splitting suites into parallelizable sub-suites."""
from __future__ import annotations
import os
from datetime import datetime
from itertools import chain
from typing import NamedTuple, Callable, Optional, List, Dict, Any
import inject
import requests
import structlog
from evergreen import EvergreenApi
from buildscripts.task_generation.resmoke_proxy import ResmokeProxyService
from buildscripts.task_generation.suite_split_strategies import SplitStrategy, FallbackStrategy
from buildscripts.timeouts.timeout import TimeoutEstimate
from buildscripts.util import taskname
from buildscripts.util.teststats import HistoricTaskData, TestRuntime, normalize_test_name
LOGGER = structlog.getLogger(__name__)
CLEAN_EVERY_N_HOOK = "CleanEveryN"
HEADER_TEMPLATE = """# DO NOT EDIT THIS FILE. All manual edits will be lost.
# This file was generated by {file}.
"""
# pylint: disable=too-many-arguments
class SubSuite(object):
"""A suite of tests that can be run by evergreen."""
def __init__(self, test_list: List[str], task_overhead: float,
runtime_list: Optional[List[TestRuntime]] = None) -> None:
"""
Initialize the object.
:param test_list: List of tests to include in this sub-suite.
:param task_overhead: Runtime overhead to expect from task level hooks.
:param runtime_list: List of historic runtimes for tests in test_list.
"""
runtime_count = 0
total_runtime = 0.0
max_runtime = 0.0
if runtime_list:
runtime_map = {test.test_name: test.runtime for test in runtime_list}
for test in test_list:
if test in runtime_map and runtime_map[test] > 0:
runtime_count += 1
total_runtime += runtime_map[test]
max_runtime = max(max_runtime, runtime_map[test])
self.test_list = test_list
self.tests_with_runtime_info = runtime_count
self.max_test_runtime = max_runtime
self.expected_runtime = total_runtime + task_overhead
def should_overwrite_timeout(self) -> bool:
"""
Whether the timeout for this suite should be overwritten.
We should only overwrite the timeout if we have runtime info for all tests.
"""
return len(self) == self.tests_with_runtime_info
def get_timeout_estimate(self) -> TimeoutEstimate:
"""Get the estimated runtime of this task to for timeouts."""
if self.should_overwrite_timeout():
return TimeoutEstimate(max_test_runtime=self.max_test_runtime,
expected_task_runtime=self.expected_runtime)
return TimeoutEstimate.no_timeouts()
def __len__(self) -> int:
return len(self.test_list)
class GeneratedSuite(NamedTuple):
"""
Collection of sub-suites generated from the a parent suite.
sub_suites: List of sub-suites comprising whole suite.
build_variant: Name of build variant suite will run on.
task_name: Name of task generating suite.
suite_name: Name of suite.
include_build_variant_in_name: Include the build variant as part of display task names.
"""
sub_suites: List[SubSuite]
build_variant: str
task_name: str
suite_name: str
def get_test_list(self) -> List[str]:
"""Get the list of tests that will be run by this suite."""
return list(chain.from_iterable(sub_suite.test_list for sub_suite in self.sub_suites))
def __len__(self) -> int:
"""Get the number of sub-suites."""
return len(self.sub_suites)
def sub_suite_config_file(self, index: Optional[int] = None) -> str:
"""
Get the name of the file to store the resmoke configuration.
:param index: Index of suite or None for '_misc' suite.
:return: Name of generated resmoke.py configuration file.
"""
# Use self.task_name here instead of self.suite_name since multiple tasks can have the same resmoke.py suite.
return f"{taskname.name_generated_task(self.task_name, index, len(self.sub_suites))}.yml"
def sub_suite_task_name(self, index: Optional[int] = None) -> str:
"""
Get the name of the task that runs one of the generated sub-suites.
:param index: Index of suite or None for '_misc' suite.
:return: Name of generated Evergreen task.
"""
return taskname.name_generated_task(self.task_name, index, len(self.sub_suites),
self.build_variant)
def sub_suite_test_list(self, index: int) -> List[str]:
"""Get the list of tests from a sub-suite."""
return self.sub_suites[index].test_list
class SuiteSplitParameters(NamedTuple):
"""
Parameters for splitting resmoke suites.
build_variant: Build variant generated for.
task_name: Name of task being split.
suite_name: Name of suite being split.
filename: Filename of suite configuration.
is_asan: Whether the build variant being generated on is ASAN.
test_file_filter: Optional filter describing which tests should be included.
"""
build_variant: str
task_name: str
suite_name: str
filename: str
is_asan: bool = False
test_file_filter: Optional[Callable[[str], bool]] = None
class SuiteSplitConfig(NamedTuple):
"""
Global configuration for generating suites.
evg_project: Evergreen project.
target_resmoke_time: Target runtime for generated sub-suites.
max_sub_suites: Max number of sub-suites to generate.
max_tests_per_suite: Max number of tests to put in a single sub-suite.
start_date: Start date to query for test history.
end_date: End date to query for test history.
default_to_fallback: Use the fallback method for splitting tasks rather than dynamic splitting.
include_build_variant_in_name: Include the build variant as part of display task names.
"""
evg_project: str
target_resmoke_time: int
max_sub_suites: int
max_tests_per_suite: int
start_date: datetime
end_date: datetime
default_to_fallback: bool = False
include_build_variant_in_name: bool = False
class SuiteSplitService:
"""A service for splitting resmoke suites into sub-suites that can be run in parallel."""
@inject.autoparams()
def __init__(
self,
evg_api: EvergreenApi,
resmoke_proxy: ResmokeProxyService,
config: SuiteSplitConfig,
split_strategy: SplitStrategy,
fallback_strategy: FallbackStrategy,
) -> None:
"""
Initialize the suite split service.
:param evg_api: Evergreen API client.
:param resmoke_proxy: Resmoke Proxy service.
:param config: Configuration options of how to split suites.
"""
self.evg_api = evg_api
self.resmoke_proxy = resmoke_proxy
self.config = config
self.split_strategy = split_strategy
self.fallback_strategy = fallback_strategy
def split_suite(self, params: SuiteSplitParameters) -> GeneratedSuite:
"""
Split the given resmoke suite into multiple sub-suites.
:param params: Description of suite to split.
:return: List of sub-suites from the given suite.
"""
if self.config.default_to_fallback:
return self.calculate_fallback_suites(params)
evg_stats = HistoricTaskData.from_s3(self.config.evg_project, params.task_name,
params.build_variant)
if evg_stats:
return self.calculate_suites_from_evg_stats(evg_stats, params)
LOGGER.debug("No test history, using fallback suites")
# Since there is no test history this is probably a new suite, just use the fallback values.
return self.calculate_fallback_suites(params)
def calculate_fallback_suites(self, params: SuiteSplitParameters) -> GeneratedSuite:
"""Divide tests into a fixed number of suites."""
LOGGER.debug("Splitting tasks based on fallback", max_sub_suites=self.config.max_sub_suites)
test_list = self.resmoke_proxy.list_tests(params.suite_name)
if params.test_file_filter:
test_list = [test for test in test_list if params.test_file_filter(test)]
test_lists = self.fallback_strategy(test_list, self.config.max_sub_suites)
return self.test_lists_to_suite(test_lists, params, [])
def calculate_suites_from_evg_stats(self, test_stats: HistoricTaskData,
params: SuiteSplitParameters) -> GeneratedSuite:
"""
Divide tests into suites that can be run in less than the specified execution time.
:param test_stats: Historical test results for task being split.
:param params: Description of how to split the suite.
:return: List of sub suites calculated.
"""
execution_time_secs = self.config.target_resmoke_time * 60
tests_runtimes = self.filter_tests(test_stats.get_tests_runtimes(), params)
if not tests_runtimes:
LOGGER.debug("No test runtimes after filter, using fallback")
return self.calculate_fallback_suites(params)
test_lists = self.split_strategy(tests_runtimes, execution_time_secs,
self.config.max_sub_suites,
self.config.max_tests_per_suite,
LOGGER.bind(task=params.task_name))
return self.test_lists_to_suite(test_lists, params, tests_runtimes, test_stats)
def test_lists_to_suite(self, test_lists: List[List[str]], params: SuiteSplitParameters,
tests_runtimes: List[TestRuntime],
test_stats: Optional[HistoricTaskData] = None) -> GeneratedSuite:
"""
Create sub-suites for the given test lists.
:param test_lists: List of tests lists to create suites for.
:param params: Parameters for suite creation.
:param tests_runtimes: Historic runtimes of tests.
:param test_stats: Other historic task data.
:return: Generated suite for the sub-suites specified.
"""
sub_suites = []
for _, test_list in enumerate(test_lists):
task_overhead = self.get_task_hook_overhead(params.suite_name, params.is_asan,
len(test_list), test_stats)
sub_suites.append(SubSuite(test_list, task_overhead, tests_runtimes))
task_name = params.task_name
if self.config.include_build_variant_in_name:
task_name = f"{params.task_name}_{params.build_variant}"
return GeneratedSuite(
sub_suites=sub_suites,
build_variant=params.build_variant,
task_name=task_name,
suite_name=params.suite_name,
)
def filter_tests(self, tests_runtimes: List[TestRuntime],
params: SuiteSplitParameters) -> List[TestRuntime]:
"""
Filter out tests that do not exist in the filesystem.
:param tests_runtimes: List of tests with runtimes to filter.
:param params: Suite split parameters.
:return: Test list with unneeded tests filtered out.
"""
if params.test_file_filter:
tests_runtimes = [
test for test in tests_runtimes if params.test_file_filter(test.test_name)
]
all_tests = [
normalize_test_name(test) for test in self.resmoke_proxy.list_tests(params.suite_name)
]
return [
info for info in tests_runtimes
if os.path.exists(info.test_name) and info.test_name in all_tests
]
def get_task_hook_overhead(self, suite_name: str, is_asan: bool, test_count: int,
historic_stats: Optional[HistoricTaskData]) -> float:
"""
Add how much overhead task-level hooks each suite should account for.
Certain test hooks need to be accounted for on the task level instead of the test level
in order to calculate accurate timeouts. So we will add details about those hooks to
each suite here.
:param suite_name: Name of suite being generated.
:param is_asan: Whether ASAN is being used.
:param test_count: Number of tests in sub-suite.
:param historic_stats: Historic runtime data of the suite.
"""
# The CleanEveryN hook is run every 'N' tests. The runtime of the
# hook will be associated with whichever test happens to be running, which could be
# different every run. So we need to take its runtime into account at the task level.
if historic_stats is None:
return 0.0
clean_every_n_cadence = self._get_clean_every_n_cadence(suite_name, is_asan)
avg_clean_every_n_runtime = historic_stats.get_avg_hook_runtime(CLEAN_EVERY_N_HOOK)
LOGGER.debug("task hook overhead", cadence=clean_every_n_cadence,
runtime=avg_clean_every_n_runtime, is_asan=is_asan)
if avg_clean_every_n_runtime != 0:
n_expected_runs = test_count / clean_every_n_cadence
return n_expected_runs * avg_clean_every_n_runtime
return 0.0
def _get_clean_every_n_cadence(self, suite_name: str, is_asan: bool) -> int:
"""
Get the N value for the CleanEveryN hook.
:param suite_name: Name of suite being generated.
:param is_asan: Whether ASAN is being used.
:return: How frequently clean every end is run.
"""
# Default to 1, which is the worst case meaning CleanEveryN would run for every test.
clean_every_n_cadence = 1
if is_asan:
# ASAN runs hard-code N to 1. See `resmokelib/testing/hooks/cleanup.py`.
return clean_every_n_cadence
clean_every_n_config = self._get_hook_config(suite_name, CLEAN_EVERY_N_HOOK)
if clean_every_n_config:
clean_every_n_cadence = clean_every_n_config.get("n", 1)
return clean_every_n_cadence
def _get_hook_config(self, suite_name: str, hook_name: str) -> Optional[Dict[str, Any]]:
"""
Get the configuration for the given hook.
:param hook_name: Name of hook to query.
:return: Configuration for hook, if it exists.
"""
hooks_config = self.resmoke_proxy.read_suite_config(suite_name).get("executor",
{}).get("hooks")
if hooks_config:
for hook in hooks_config:
if hook.get("class") == hook_name:
return hook
return None