mongo/buildscripts/util/codeowners_utils.py

83 lines
2.7 KiB
Python

import fnmatch
import os
import re
from functools import lru_cache
from typing import Dict, List, Tuple
import yaml
@lru_cache
def process_owners(cur_dir: str) -> Tuple[Dict[re.Pattern, List[str]], bool]:
if not cur_dir:
return ({}, False)
owners_file_path = cur_dir + "/OWNERS.yml"
if not os.path.exists(owners_file_path):
return process_owners(os.path.dirname(cur_dir))
with open(owners_file_path, "r", encoding="utf8") as f:
contents = yaml.safe_load(f)
assert "version" in contents, f"Version not found in {owners_file_path}"
assert contents["version"] == "1.0.0", f"Invalid version in {owners_file_path}"
assert "filters" in contents
no_parent_owners = False
if "options" in contents:
options = contents["options"]
no_parent_owners = "no_parent_owners" in options and options["no_parent_owners"]
filters = {}
for file_filter in contents["filters"]:
assert "approvers" in file_filter
approvers = file_filter["approvers"]
del file_filter["approvers"]
file_filter.pop("metadata", None)
assert len(file_filter) == 1
pattern = next(iter(file_filter))
pattern = f"{cur_dir}/{pattern}"
regex_pattern = re.compile(fnmatch.translate(pattern))
filters[regex_pattern] = []
for approver in approvers:
filters[regex_pattern].append(approver)
return (filters, no_parent_owners)
class Owners:
def __init__(self):
self.co_jira_map = yaml.safe_load(
open("buildscripts/util/co_jira_map.yml", "r", encoding="utf8")
)
def get_codeowners(self, file_path: str) -> List[str]:
cur_dir = os.path.dirname(file_path)
codeowners = []
# search up tree until matching filter found
while True:
filters, no_parent = process_owners(cur_dir)
if not cur_dir:
break
# latest applicable filter takes precedence
for regex_filter, cur_codeowners in filters.items():
if regex_filter.fullmatch(file_path):
codeowners = cur_codeowners
if codeowners or no_parent:
break
cur_dir = os.path.dirname(cur_dir)
return codeowners
def get_jira_team_from_codeowner(self, codeowner: str) -> List[str]:
return self.co_jira_map[codeowner]
def get_jira_team_owner(self, file_path: str) -> List[str]:
return [
jira_team
for codeowner in self.get_codeowners(file_path)
for jira_team in self.get_jira_team_from_codeowner(codeowner)
]