SERVER-104399 SERVER-103542 implement submodule and file_private visibility rules (#35630)

GitOrigin-RevId: 3072be4a4cda82cae272260c0f74c70c32ea9558
This commit is contained in:
Mathias Stearn 2025-05-02 17:15:05 +02:00 committed by MongoDB Bot
parent 58d505a6b9
commit ded9a0e961
1 changed files with 55 additions and 37 deletions

View File

@ -3,24 +3,16 @@ import glob
import json
import multiprocessing
import os
import re
import sys
from concurrent.futures import ProcessPoolExecutor, as_completed, wait
from copy import deepcopy
from datetime import datetime
from typing import Any, TypedDict
import pyzstd
import typer # nicer error dump on exceptions
import yaml
from progressbar import ProgressBar, progressbar
try:
from yaml import CDumper as Dumper
from yaml import CLoader as Loader # noqa: F401 Not used right now but may be again
except ImportError:
raise RuntimeError("Why no cYaml?")
# from yaml import Loader, Dumper
class Decl(TypedDict):
display_name: str
@ -124,10 +116,13 @@ def worker(paths: list[bytes]):
merge_decls(json.loads(f.read()))
def is_submodule_usage(decl: Decl, mod: str) -> bool:
return decl["mod"] == mod or mod.startswith(decl["mod"] + ".")
def main(
jobs: int = typer.Option(os.cpu_count(), "--jobs", "-j"),
intra_module: bool = typer.Option(False, help="Include intra-module accesses"),
generate_yaml: bool = False,
):
timer = Timer()
paths = glob.glob(b"bazel-bin/**/*.mod_scanner_decls.json.zst", recursive=True)
@ -165,14 +160,16 @@ def main(
merge_decls(json.loads(f.read()))
timer.mark("processed input")
no_self_decls = deepcopy(all_decls)
for decl in no_self_decls.values():
if decl["mod"] in decl["used_from"]:
del decl["used_from"][decl["mod"]]
out: Any = [dict(d) for d in all_decls.values()] # shallow copy each decl
if not intra_module:
for decl in out:
decl["used_from"] = {
mod: locs
for mod, locs in decl["used_from"].items()
if not is_submodule_usage(decl, mod)
}
out = [d for d in out if d["used_from"]]
out: Any = deepcopy(
[d for d in (no_self_decls if not intra_module else all_decls).values() if d["used_from"]]
)
for decl in out:
# go from {$MOD: $LOCS} map to [{mod: $MOD, locs: $LOCS}] list of
# objects which is easier to work with in mongo aggregations
@ -185,30 +182,51 @@ def main(
json.dump(out, f)
timer.mark("dumped json")
if generate_yaml:
for decl in out:
decl["used_from"] = {u["mod"]: u["locs"] for u in decl["used_from"]} # type: ignore
# sort by file to make it easier to use
out.sort(key=lambda d: d["loc"])
found_violations = False
for decl in sorted(all_decls.values(), key=lambda d: d["display_name"]):
violations = []
match decl["visibility"]:
case "private":
err = f"Illegal use of {decl['display_name']} outside of module {decl['mod']}:"
for mod, locs in decl["used_from"].items():
if not is_submodule_usage(decl, mod):
for loc in locs:
violations.append(f" {loc} ({mod})")
timer.mark("massaged output for yaml")
case "file_private":
err = f"Illegal use of {decl['display_name']} outside of its file family:"
with open("merged_decls.yaml", "w") as f:
yaml.dump(out, f, Dumper=Dumper, width=1000000)
timer.mark("dumped yaml")
# file_base is the portion of the file name that defines the family
# e.g. bazel-out/blah/src/mongo/db/foo_details.h -> src/mongo/db/foo
file_base = decl["loc"].split(".")[0]
if index := file_base.index("src/mongo/"):
file_base = file_base[index:]
file_base = re.sub(r"_(internal|detail)s?$", "", file_base)
assert file_base.startswith("src/mongo/")
out = [d for d in no_self_decls.values() if d["used_from"] and d["visibility"] == "private"]
out.sort(key=lambda d: d["display_name"])
for decl in out:
print(f"Illegal use of {decl['display_name']} outside of module {decl['mod']}:")
print(f" loc: {decl['loc']}")
print(" usages:")
for mod, locs in decl["used_from"].items():
for loc in locs:
print(f" {loc} ({mod})")
file_family_regex = re.compile(
rf"[\w/]*{file_base}(?:_(?:internals?|details?|test|bm|mock)(_.*)?)?\."
)
assert file_family_regex.match(decl["loc"]) # sanity check
for mod, locs in decl["used_from"].items():
for loc in locs:
# Must be in the same module even if file family matches.
# This helps prevent accidental matches.
if mod != decl["mod"] or not file_family_regex.match(loc):
violations.append(f" {loc} ({mod})")
case _: # ignore other visibility types
continue
if violations:
found_violations = True
print(err)
print(f" loc: {decl['loc']}")
print(" usages:")
print("\n".join(violations))
timer.mark("checked for privacy violations")
if out:
sys.exit(1)
sys.exit(found_violations) # bools are ints, so False(0) is success and True(1) is failure
if __name__ == "__main__":