mongo/buildscripts/resmokelib/utils/history.py

417 lines
14 KiB
Python

"""Contains classes and methods for tracking historic state."""
import copy
import traceback
import typing
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import MutableMapping
from dataclasses import asdict, dataclass, field
from enum import Enum
import yaml
from buildscripts.resmokelib.utils import default_if_none, load_yaml, load_yaml_file, registry
# How large of a stack to take for each location.
STACK_LIMIT = 3
_HISTORICS = {} # type: ignore
SCHEMA_VERSION = "0.1"
def make_historic(obj):
"""Convert a python object into a corresponding Historic."""
if isinstance(obj, ALLOWED_TYPES):
return obj
obj_class = _HISTORICS[type(obj).__name__]
return obj_class.from_python_obj(obj)
def historic_from_stored_dict(stored_dict):
"""Convert a dict for storage into a historic object."""
obj_class = _HISTORICS[stored_dict["object_class"]]
return obj_class.from_storable_dict(stored_dict)
def storable_dict_from_historic(to_storable):
"""Convert a historic into a dict that can be stored in history."""
if isinstance(to_storable, Historic):
return to_storable.to_storable_dict()
else:
# If we're a primitive immutable
return to_storable
class Historic(ABC, metaclass=registry.make_registry_metaclass(_HISTORICS, type(ABC))):
"""ABC for classes that have trackable historic state."""
def __init__(self):
"""Initialize subscriber list."""
self._subscribers = []
def subscribe(self, subscriber, key):
"""
Subscribe to the Historic object.
The subscriber's accept_write is called on an update.
"""
if not isinstance(subscriber, Historic):
raise ValueError("Subscribers should inherit from the Historic ABC.")
self._subscribers.append(Subscriber(obj=subscriber, key=key))
def unsubscribe(self, subscriber):
"""Allow a subscriber to unsubscribe from notifications."""
self._subscribers = [sub for sub in self._subscribers if sub.obj is not subscriber]
def notify_subscriber_write(self):
"""Notify the subscribers that a write has happened."""
for subscriber in self._subscribers:
subscriber.obj.accept_write(subscriber.key)
@abstractmethod
def to_storable_dict(self):
"""
Convert this object to a dict that can be stored in yaml.
Note that if a Historic stores history data itself, this is allowed to
be lost in the storage/retrieval of subordinate Historics.
"""
return
@staticmethod
@abstractmethod
def from_storable_dict(raw_dict):
"""Create a new object from the raw dict returned by to_storable_dict."""
return
@staticmethod
@abstractmethod
def from_python_obj(obj):
"""
Create a new Historic from the given python object.
If inheriting from this in a class that wraps a python object,
include a REGISTERED_NAME string attribute for the type it converts from.
Otherwise, override this function to just return obj.
"""
return
def accept_write(self, key):
"""
Update state based on a subscriber's write.
Override this method if a class also tracks historic state.
"""
self.notify_subscriber_write()
@dataclass
class Subscriber:
"""Class representing the subscriber to a Historic."""
obj: "typing.Any"
key: "typing.Any"
# 1. We only allow immutable types or types that have special logic
# for being inside a HistoryDict, or else we may miss changes and
# have difficulty converting to yaml.
# 2. Dictionaries are allowed to be passed in but are implicitly converted to Historic.
ALLOWED_TYPES = (bool, int, float, str, type(None), Historic)
class HistoryDict(MutableMapping, Historic):
"""
Dict-like class that tracks history.
Smart stored classes can decide for themselves what
counts as an access. Don't assume thread safety.
Note that this class will deep-copy stored values.
"""
REGISTERED_NAME = "dict"
def __init__(self, filename=None, yaml_string=None, raw_dict=None):
"""Init from a yaml file, from a yaml string, or default-construct."""
super(HistoryDict, self).__init__()
if filename is not None and yaml_string is not None:
raise ValueError("Cannot construct HistoryDict from both yaml object and file.")
self._history_store = defaultdict(list)
self._value_store = dict()
self._global_time = 0
raw_dict = default_if_none(raw_dict, {})
if filename is not None:
raw_dict = load_yaml_file(filename)
elif yaml_string is not None:
raw_dict = load_yaml(yaml_string)
else:
return # Just default-construct.
schema_version = raw_dict["SchemaVersion"]
if schema_version != SCHEMA_VERSION:
raise ValueError(
f"Invalid schema version. Expected {SCHEMA_VERSION} but found {schema_version}."
)
history_dict = raw_dict["History"]
for key in history_dict:
for raw_access in history_dict[key]:
access = Access.from_dict(raw_access)
self._history_store[key].append(access)
self._global_time = max(access.time, self._global_time)
last_val = self._retrieve_last_value(key)
if last_val is not TOMBSTONE:
self._value_store[key] = last_val
# The next recorded global time should be 1 higher than the last.
self._global_time += 1
def dump_history(self, filename=None, include_location=False):
"""Dump the history to as yaml."""
# We can't safe_dump python objects, so we convert the whole store
# to yaml-able data. (We assume the stored contents are yaml-able.)
dumpable = dict()
for key in self._history_store:
dumpable[key] = list()
for access in self._history_store[key]:
access_dict = access.as_dict()
if include_location:
access_dict["location"] = PipeLiteral(access_dict["location"])
else:
del access_dict["location"]
dumpable[key].append(access_dict)
to_dump = {"History": dumpable}
dump = yaml.dump(to_dump)
# Improves human readability for this data a ton.
processed = []
for line in dump.splitlines():
if not line.startswith((" -", " ")):
# If this line starts a top-level key.
processed.append("\n")
processed.append(line)
output = "\n".join(processed)
# Make sure SchemaVersion is at the top.
output = f'SchemaVersion: "{SCHEMA_VERSION}"\n' + output
if filename is not None:
with open(filename, "w") as fp:
fp.write(output)
return output
def write_equals(self, other_dict):
"""Compare two dicts for write equality."""
if not len(other_dict._value_store) == len(self._value_store):
return False
for key in self._value_store:
our_writes = [
access.value_written
for access in self._history_store[key]
if access.type == AccessType.WRITE
]
their_writes = [
access.value_written
for access in other_dict._history_store[key]
if access.type == AccessType.WRITE
]
if not our_writes == their_writes:
return False
return True
def to_storable_dict(self):
"""Convert to a dict for storage, overrides Historic."""
storable_dict = {}
for key, value in self._value_store.items():
storable_dict[key] = storable_dict_from_historic(value)
return {"object_class": HistoryDict.REGISTERED_NAME, "object_value": storable_dict}
@staticmethod
def from_storable_dict(raw_dict):
"""Convert from a dict for storage, overrides Historic."""
return HistoryDict(raw_dict=raw_dict["object_value"])
@staticmethod
def from_python_obj(obj):
"""Convert from a python object, overrides Historic."""
if not isinstance(obj, dict):
raise ValueError("HistoryDict can only be converted from dict python objects.")
history_dict = HistoryDict()
for key, value in obj.items():
history_dict[key] = make_historic(value)
return history_dict
def accept_write(self, key):
"""Record subscribee's write. Overrides Historic."""
self._record_write(key, self._value_store[key])
super(HistoryDict, self).accept_write(key)
def copy(self):
"""
Shallow-copy the value store, deep-copy history.
Don't record writes here.
"""
history_dict = HistoryDict()
history_dict._global_time = self._global_time
history_dict._history_store = copy.deepcopy(self._history_store)
for key, value in self.items():
history_dict[key] = make_historic(value)
return history_dict
def __getitem__(self, key):
# We don't return a deep copy because we rely on objects to alert us
# when modified.
return self._value_store[key]
def __setitem__(self, key, value):
# Implicitly convert dictionaries to HistoricDicts to avoid users having to manually wrap dictionaries.
# This should only used when assigning a dictionary directly to a key in HistoryDict and not a standalone
# variable, as its history will not be recorded.
if isinstance(value, dict):
value = make_historic(value)
if not isinstance(value, ALLOWED_TYPES):
raise ValueError(
f"HistoryDict cannot store type {type(value)}."
" Please use a different type or create a Historic wrapper."
)
self._record_write(key, value)
self._value_store[key] = value
if isinstance(value, HistoryDict):
value.subscribe(self, key)
self.notify_subscriber_write()
def __delitem__(self, key):
self._record_delete(key)
if isinstance(self._value_store[key], Historic):
self._value_store[key].unsubscribe(self)
del self._value_store[key]
self.notify_subscriber_write()
def __iter__(self):
return iter(self._value_store)
def __len__(self):
return len(self._value_store)
def __str__(self):
# Dict's str doesn't recursively convert subordinate HistoryDicts
pairs = []
for key, value in self._value_store.items():
if isinstance(value, str):
pairs.append(f"'{key}': '{str(value)}'")
else:
pairs.append(f"'{key}': {str(value)}")
return "{" + ", ".join(pairs) + "}"
def __repr__(self):
# eval(repr(self)) isn't valid, but this is at least useful for debugging.
return f"{self.__class__.__name__}({repr(self._value_store)})"
def _record_write(self, key, value):
written = None
if type(value) in ALLOWED_TYPES and value is not Historic:
written = value
cur_access = Access(
type=AccessType.WRITE,
location=_get_location(),
value_written=written,
time=self._global_time,
)
self._history_store[key].append(cur_access)
self._global_time += 1
def _record_delete(self, key):
cur_access = Access(
type=AccessType.DELETE,
location=_get_location(),
value_written=None,
time=self._global_time,
)
self._history_store[key].append(cur_access)
self._global_time += 1
def _retrieve_last_value(self, key):
for access in reversed(self._history_store[key]):
if access.type == AccessType.WRITE:
if isinstance(access.value_written, dict):
return historic_from_stored_dict(access.value_written)
else:
return copy.deepcopy(access.value_written)
elif access.type == AccessType.DELETE:
break
return TOMBSTONE
# Represents a value that was deleted (or was never created).
TOMBSTONE = object()
class AccessType(Enum):
"""Class representing the operation performed in an accesss."""
READ = 0 # Reads are not recorded here.
WRITE = 1
DELETE = 2
@dataclass
class Access:
"""Class representing an access to store in the dict's history."""
type: "AccessType"
time: int
location: ["traceback.FrameSummary"] = field(default_factory=list)
value_written: "typing.Any" = None
def as_dict(self):
"""Convert this class into a dict (accounting for AccessType)."""
self_dict = asdict(self)
self_dict["type"] = self_dict["type"].name
self_dict["value_written"] = copy.deepcopy(self.value_written)
return self_dict
@staticmethod
def from_dict(raw_dict):
"""Retrieve this class from a dict (accounting for AccessType)."""
return Access(
type=AccessType[raw_dict["type"]],
time=raw_dict["time"],
location=raw_dict["location"] if "location" in raw_dict else list(),
value_written=copy.deepcopy(raw_dict["value_written"])
if "value_written" in raw_dict
else None,
)
def _get_location():
"""Return the location as a string, accounting for this function and the parent in the stack."""
return "".join(traceback.format_stack(limit=STACK_LIMIT + 2)[:-2])
class PipeLiteral(str):
"""Construct with a string to create a pipe literal for yaml representation."""
pass
def pipe_literal_representer(dumper, data):
"""Create a representer for pipe literals, used internally for pyyaml."""
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
yaml.add_representer(PipeLiteral, pipe_literal_representer)