mongo/buildscripts/bazel_rules_mongo/codeowners/codeowners_generate.py

536 lines
19 KiB
Python

import argparse
import difflib
import os
import subprocess
import sys
import tempfile
from functools import cache
from typing import Dict, List, Optional, Set, Tuple
import yaml
from codeowners.parsers import owners_v1, owners_v2
from codeowners.validate_codeowners import run_validator
from utils import evergreen_git
OWNERS_FILE_NAMES = ("OWNERS.yml", "OWNERS.yaml")
parsers = {
"1.0.0": owners_v1.OwnersParserV1(),
"2.0.0": owners_v2.OwnersParserV2(),
}
class FileNode:
def __init__(self, directory: str):
self.dirs: Dict[str, FileNode] = {}
self.owners_file: Optional[str] = None
self.directory = directory
def add_file_to_tree(root_node: FileNode, file_parts: List[str]):
current_node = root_node
for i, dir in enumerate(file_parts[:-1]):
node_dirs = current_node.dirs
if dir not in node_dirs:
directory = "/".join(file_parts[: i + 1])
node_dirs[dir] = FileNode(f"./{directory}")
current_node = node_dirs[dir]
assert (
current_node.owners_file is None or current_node.owners_file == file_parts[-1]
), f"there are two OWNERS files in the following directory: ./{'/'.join(file_parts[:-1])}"
current_node.owners_file = file_parts[-1]
def build_tree(files: List[str]) -> FileNode:
root_node = FileNode("./")
for file in files:
file_parts = file.split("/")
file_name = file_parts[-1]
if file_name not in OWNERS_FILE_NAMES:
continue
add_file_to_tree(root_node, file_parts)
return root_node
def process_owners_file(output_lines: list[str], node: FileNode) -> None:
directory = node.directory
file_name = node.owners_file
if not file_name:
return
owners_file_path = os.path.join(directory, file_name)
print(f"parsing: {owners_file_path}")
output_lines.append(f"# The following patterns are parsed from {owners_file_path}")
with open(owners_file_path, "r", encoding="utf8") as file:
contents = yaml.safe_load(file)
assert "version" in contents, f"Version not found in {owners_file_path}"
assert contents["version"] in parsers, f"Unsupported version in {owners_file_path}"
parser = parsers[contents["version"]]
owners_lines = parser.parse(directory, owners_file_path, contents)
output_lines.extend(owners_lines)
output_lines.append("")
# Order matters, we need to always add the contents of the root directory to codeowners first
# and work our way to the outside directories in that order.
def process_dir(output_lines: list[str], node: FileNode) -> None:
process_owners_file(output_lines, node)
for directory in sorted(node.dirs.keys()):
process_dir(output_lines, node.dirs[directory])
def print_diff_and_instructions(old_codeowners_contents, new_codeowners_contents):
print("ERROR: New contents of codeowners file does not match old contents.")
print("\nDifferences between old and new contents:")
diff = difflib.unified_diff(
old_codeowners_contents.splitlines(keepends=True),
new_codeowners_contents.splitlines(keepends=True),
fromfile="Old CODEOWNERS",
tofile="New CODEOWNERS",
)
sys.stdout.writelines(diff)
print("If you are seeing this message in CI you likely need to run `bazel run codeowners`")
def validate_generated_codeowners(validator_path: str) -> int:
"""Validate the generated CODEOWNERS file.
Returns:
int: 0 if validation succeeds, non-zero otherwise.
"""
print("\nValidating generated CODEOWNERS file...")
try:
validation_result = run_validator(validator_path)
if validation_result != 0:
print("CODEOWNERS validation failed!")
return validation_result
print("CODEOWNERS validation successful!")
return 0
except Exception as exc:
print(f"Error during CODEOWNERS validation: {str(exc)}")
return 1
@cache
def get_unowned_and_default_owned_files(
codeowners_binary_path: str, codeowners_file: str = None
) -> Tuple[Set[str], Set[str]]:
temp_output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt")
temp_output_file.close()
default_owner = get_default_owner()
codeowners_file_arg = ""
if codeowners_file:
codeowners_file_arg = f"--file {codeowners_file}"
# This file can be bigger than the allowed subprocess buffer so we redirect output into a file
command = f"{codeowners_binary_path} --tracked {codeowners_file_arg} > {temp_output_file.name}"
process = subprocess.run(command, shell=True, stderr=subprocess.PIPE, text=True)
if process.returncode != 0:
print(process.stderr)
raise RuntimeError("Error while trying to find unowned files")
unowned_files = set()
default_owned_files = set()
with open(temp_output_file.name, "r") as file:
for line in file.read().split("\n"):
if not line:
continue
parts = line.split()
file_name = parts[0].strip()
owners = parts[1:]
if owners[0] == "(unowned)":
assert (
len(owners) == 1
), f"There were somehow multiple owners for an unowned file: {parts}"
unowned_files.add(file_name)
elif default_owner and default_owner in owners:
default_owned_files.add(file_name)
return unowned_files, default_owned_files
def check_new_files(codeowners_binary_path: str, expansions_file: str, branch: str) -> int:
new_files = evergreen_git.get_new_files(expansions_file, branch)
if not new_files:
print("No new files were detected.")
return 0
print(f"The following new files were detected: {new_files}")
default_owner = get_default_owner()
unowned_files, default_owned_files = get_unowned_and_default_owned_files(codeowners_binary_path)
allowed_unowned_files = get_allowed_unowned_files()
unowned_new_files = []
default_owned_new_files = []
for file in new_files:
if file in unowned_files and f"/{file}" not in allowed_unowned_files:
unowned_new_files.append(file)
if file in default_owned_files:
default_owned_new_files.append(file)
has_error = False
if unowned_new_files:
print("The following new files are unowned:")
for file in unowned_new_files:
print(f"- {file}")
print("New files are required to have code owners. See http://go/codeowners-ug")
has_error = True
if default_owned_new_files:
assert (
default_owner
), "There were new files owned by the default owner but there is no default owner detected."
print(f"The following new files are owned by the default owner {default_owner}:")
for file in default_owned_new_files:
print(f"- {file}")
print("New files are required to have a non-default owner. See http://go/codeowners-ug")
has_error = True
if has_error:
return 1
print("There are no new files added that are unowned.")
return 0
def check_orphaned_files(
codeowners_binary_path: str, expansions_file: str, branch: str, codeowners_file: str
) -> int:
# This compares the new codeowners file with the old codeowners file on the same working tree
# This tells us which coverage is lost between codeowners file changes
current_unowned_files, current_default_owned_files = get_unowned_and_default_owned_files(
codeowners_binary_path
)
base_revision = evergreen_git.get_diff_revision(expansions_file, branch)
previous_codeowners_file_contents = evergreen_git.get_file_at_revision(
codeowners_file, base_revision
)
if previous_codeowners_file_contents is None:
return 0
temp_codeowners_file = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt")
temp_codeowners_file.write(previous_codeowners_file_contents)
temp_codeowners_file.close()
old_unowned_files, old_default_owned_files = get_unowned_and_default_owned_files(
codeowners_binary_path, temp_codeowners_file.name
)
allowed_unowned_files = get_allowed_unowned_files()
unowned_files_difference = current_unowned_files - old_unowned_files
for file in list(unowned_files_difference):
if f"/{file}" in allowed_unowned_files:
unowned_files_difference.remove(file)
default_owned_files_difference = current_default_owned_files - old_default_owned_files
if not unowned_files_difference and not default_owned_files_difference:
print("No files have lost ownership with these changes.")
return 0
if unowned_files_difference:
print("The following files lost ownership with CODEOWNERS changes:")
for file in sorted(unowned_files_difference):
print(f"- {file}")
if default_owned_files_difference:
print("The following files changed to default ownership with CODEOWNERS changes:")
for file in sorted(default_owned_files_difference):
print(f"- {file}")
return 1
def post_generation_checks(
validator_path: str,
should_run_validation: bool,
codeowners_binary_path: str,
should_check_new_files: bool,
expansions_file: str,
branch: str,
codeowners_file_path: str,
) -> int:
status = 0
if should_run_validation:
status |= validate_generated_codeowners(validator_path)
if should_check_new_files:
status |= check_new_files(codeowners_binary_path, expansions_file, branch)
status |= check_orphaned_files(
codeowners_binary_path, expansions_file, branch, codeowners_file_path
)
status |= check_banned_codeowners(codeowners_file_path)
return status
def get_banned_codeowners_file_path() -> Optional[str]:
return os.environ.get("BANNED_CODEOWNERS_FILE_PATH", None)
# Check that there are no banned codeowners in the codeowners file
def check_banned_codeowners(codeowners_file_path: str) -> int:
banned_codeowners_file_path = get_banned_codeowners_file_path()
if not banned_codeowners_file_path:
return 0
if not os.path.isfile(banned_codeowners_file_path):
print(f"{banned_codeowners_file_path} file not found.")
return 1
banned_owners: set[str] = set()
with open(banned_codeowners_file_path, "r", encoding="utf8") as file:
for line in file:
line = line.strip()
if not line:
continue
if line.startswith("@"):
line = line[1:]
if not line.startswith("#"):
banned_owners.add(line)
print(f"Banned codeowners loaded: {banned_owners}")
offending_lines = []
with open(codeowners_file_path, "r", encoding="utf8") as file:
for i, line in enumerate(file.readlines()):
parts = line.split()
if len(parts) < 2:
continue
owners = parts[1:]
for owner in owners:
if owner.startswith("@"):
owner = owner[1:]
if owner in banned_owners:
offending_lines.append((i + 1, line.strip(), owner))
if not offending_lines:
return 0
print("The following lines in the CODEOWNERS file contain banned owners:")
for line_num, line, owner in offending_lines:
print(f" line {line_num}: {line} (banned owner: {owner})")
print("Please remove the banned owners from the CODEOWNERS file.")
return 1
def get_allowed_unowned_files_path() -> Optional[str]:
return os.environ.get("ALLOWED_UNOWNED_FILES_PATH", None)
def get_default_owner() -> Optional[str]:
return os.environ.get("CODEOWNERS_DEFAULT_OWNER", None)
@cache
def get_allowed_unowned_files() -> Set[str]:
allowed_unowned_file_path = get_allowed_unowned_files_path()
if not allowed_unowned_file_path:
return set()
unowned_files = set()
with open(allowed_unowned_file_path, "r", encoding="utf8") as file:
contents = yaml.safe_load(file)
try:
assert "version" in contents, f"version field not found in {allowed_unowned_file_path}"
assert contents["version"] == "1.0.0", f"unknown version in {allowed_unowned_file_path}"
del contents["version"]
working_directory = os.curdir
assert "filters" in contents, f"No filters were found in {allowed_unowned_file_path}"
for filter in contents["filters"]:
assert "justification" in filter, "all filters need a justification"
pattern = filter["filter"]
assert pattern.startswith("/"), "All unowned file filters must start with a /"
assert "*" not in pattern, "No wildcard patterns allowed in unowned file filters."
test_pattern = f"{working_directory}{pattern}"
assert os.path.exists(test_pattern), f"Filter was not found: {pattern}"
assert not os.path.isdir(
test_pattern
), "No directories are allowed in unowned file filters."
assert os.path.isfile(test_pattern), f"No files matched pattern: {pattern}"
unowned_files.add(pattern)
except Exception as ex:
print(f"Error occurred while parsing {allowed_unowned_file_path}")
print(
"For documentation around the file format please read https://github.com/10gen/mongo/blob/master/docs/owners/allowed_unowned_files_format.md"
)
raise ex
return unowned_files
def add_allowed_unowned_files(output_lines: List[str]) -> None:
allowed_unowned_files = get_allowed_unowned_files()
if not allowed_unowned_files:
return
allowed_unowned_files_path = get_allowed_unowned_files_path()
assert (
allowed_unowned_files_path
), "Somehow there were allowed unowned files but a path was not found."
output_lines.append(f"# The following lines are added from {allowed_unowned_files_path}")
for file in sorted(allowed_unowned_files):
output_lines.append(f"{file}")
# adds a newline
output_lines.append("")
def main():
# If we are running in bazel, default the directory to the workspace
default_dir = os.environ.get("BUILD_WORKSPACE_DIRECTORY")
if not default_dir:
process = subprocess.run(
["git", "rev-parse", "--show-toplevel"], capture_output=True, text=True, check=True
)
default_dir = process.stdout.strip()
codeowners_validator_path = os.environ.get("CODEOWNERS_VALIDATOR_PATH")
if not codeowners_validator_path:
raise RuntimeError("no CODEOWNERS_VALIDATOR_PATH env var found")
codeowners_validator_path = os.path.abspath(codeowners_validator_path)
codeowners_binary_path = os.environ.get("CODEOWNERS_BINARY_PATH")
if not codeowners_binary_path:
raise RuntimeError("no CODEOWNERS_BINARY_PATH env var found")
codeowners_binary_path = os.path.abspath(codeowners_binary_path)
parser = argparse.ArgumentParser(
prog="GenerateCodeowners",
description="This generates a CODEOWNERS file based off of our OWNERS.yml files. "
"Whenever changes are made to the OWNERS.yml files in the repo this script "
"should be run.",
)
parser.add_argument(
"--output-file",
help="Path of the CODEOWNERS file to be generated.",
default=os.path.join(".github", "CODEOWNERS"),
)
parser.add_argument(
"--repo-dir", help="Root of the repo to scan for OWNER files.", default=default_dir
)
parser.add_argument(
"--check",
help="When set, program exits 1 when the CODEOWNERS content changes. This will skip generation",
default=False,
action="store_true",
)
parser.add_argument(
"--run-validation",
help="When set, validation will be run against the resulting CODEOWNERS file.",
default=True,
action="store_false",
)
parser.add_argument(
"--check-new-files",
help="When set, this script will check new files to ensure they are owned.",
default=True,
action="store_false",
)
parser.add_argument(
"--expansions-file",
help="When set, implements CI specific logic around getting new files in a specific patch.",
default=None,
action="store",
)
parser.add_argument(
"--branch",
help="Helps the script understand what branch to compare against to see what new files are added when run locally. Defaults to master or main.",
default=None,
action="store",
)
args = parser.parse_args()
os.chdir(args.repo_dir)
# The lines to write to the CODEOWNERS file
output_lines = [
"# This is a generated file do not make changes to this file.",
"# This is generated from various OWNERS.yml files across the repo.",
"# To regenerate this file run `bazel run codeowners`",
"# The documentation for the OWNERS.yml files can be found here:",
"# https://github.com/10gen/mongo/blob/master/docs/owners/owners_format.md",
"",
]
print(f"Scanning for OWNERS.yml files in {os.path.abspath(os.curdir)}")
try:
files = evergreen_git.get_files_to_lint()
root_node = build_tree(files)
process_dir(output_lines, root_node)
add_allowed_unowned_files(output_lines)
except Exception as ex:
print("An exception was found while generating the CODEOWNERS file.")
print("Please refer to the docs to see the spec for OWNERS.yml files here :")
print("https://github.com/10gen/mongo/blob/master/docs/owners/owners_format.md")
raise ex
old_contents = ""
check = args.check
output_file = args.output_file
os.makedirs(os.path.dirname(output_file), exist_ok=True)
if check and os.path.exists(output_file):
with open(output_file, "r") as file:
old_contents = file.read()
# prioritize env var for check new file configuration
should_check_new_files = os.environ.get("CODEOWNERS_CHECK_NEW_FILES", None)
if should_check_new_files is not None:
if should_check_new_files.lower() == "false":
should_check_new_files = False
elif should_check_new_files.lower() == "true":
should_check_new_files = True
else:
raise RuntimeError(
f"Invalid value for CODEOWNERS_CHECK_NEW_FILES: {should_check_new_files}"
)
else:
should_check_new_files = args.check_new_files
new_contents = "\n".join(output_lines)
if check:
if new_contents != old_contents:
print_diff_and_instructions(old_contents, new_contents)
return 1
print("CODEOWNERS file is up to date")
return post_generation_checks(
codeowners_validator_path,
args.run_validation,
codeowners_binary_path,
should_check_new_files,
args.expansions_file,
args.branch,
output_file,
)
with open(output_file, "w") as file:
file.write(new_contents)
print(f"Successfully wrote to the CODEOWNERS file at: {os.path.abspath(output_file)}")
# Add validation after generating CODEOWNERS file
return post_generation_checks(
codeowners_validator_path,
args.run_validation,
codeowners_binary_path,
should_check_new_files,
args.expansions_file,
args.branch,
output_file,
)
if __name__ == "__main__":
exit(main())