mirror of https://github.com/mongodb/mongo
417 lines
14 KiB
Python
417 lines
14 KiB
Python
"""Contains classes and methods for tracking historic state."""
|
|
|
|
import copy
|
|
import traceback
|
|
import typing
|
|
from abc import ABC, abstractmethod
|
|
from collections import defaultdict
|
|
from collections.abc import MutableMapping
|
|
from dataclasses import asdict, dataclass, field
|
|
from enum import Enum
|
|
|
|
import yaml
|
|
|
|
from buildscripts.resmokelib.utils import default_if_none, load_yaml, load_yaml_file, registry
|
|
|
|
# How large of a stack to take for each location.
|
|
STACK_LIMIT = 3
|
|
|
|
_HISTORICS = {} # type: ignore
|
|
|
|
SCHEMA_VERSION = "0.1"
|
|
|
|
|
|
def make_historic(obj):
|
|
"""Convert a python object into a corresponding Historic."""
|
|
if isinstance(obj, ALLOWED_TYPES):
|
|
return obj
|
|
obj_class = _HISTORICS[type(obj).__name__]
|
|
return obj_class.from_python_obj(obj)
|
|
|
|
|
|
def historic_from_stored_dict(stored_dict):
|
|
"""Convert a dict for storage into a historic object."""
|
|
obj_class = _HISTORICS[stored_dict["object_class"]]
|
|
return obj_class.from_storable_dict(stored_dict)
|
|
|
|
|
|
def storable_dict_from_historic(to_storable):
|
|
"""Convert a historic into a dict that can be stored in history."""
|
|
if isinstance(to_storable, Historic):
|
|
return to_storable.to_storable_dict()
|
|
else:
|
|
# If we're a primitive immutable
|
|
return to_storable
|
|
|
|
|
|
class Historic(ABC, metaclass=registry.make_registry_metaclass(_HISTORICS, type(ABC))):
|
|
"""ABC for classes that have trackable historic state."""
|
|
|
|
def __init__(self):
|
|
"""Initialize subscriber list."""
|
|
self._subscribers = []
|
|
|
|
def subscribe(self, subscriber, key):
|
|
"""
|
|
Subscribe to the Historic object.
|
|
|
|
The subscriber's accept_write is called on an update.
|
|
"""
|
|
if not isinstance(subscriber, Historic):
|
|
raise ValueError("Subscribers should inherit from the Historic ABC.")
|
|
|
|
self._subscribers.append(Subscriber(obj=subscriber, key=key))
|
|
|
|
def unsubscribe(self, subscriber):
|
|
"""Allow a subscriber to unsubscribe from notifications."""
|
|
self._subscribers = [sub for sub in self._subscribers if sub.obj is not subscriber]
|
|
|
|
def notify_subscriber_write(self):
|
|
"""Notify the subscribers that a write has happened."""
|
|
for subscriber in self._subscribers:
|
|
subscriber.obj.accept_write(subscriber.key)
|
|
|
|
@abstractmethod
|
|
def to_storable_dict(self):
|
|
"""
|
|
Convert this object to a dict that can be stored in yaml.
|
|
|
|
Note that if a Historic stores history data itself, this is allowed to
|
|
be lost in the storage/retrieval of subordinate Historics.
|
|
"""
|
|
return
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def from_storable_dict(raw_dict):
|
|
"""Create a new object from the raw dict returned by to_storable_dict."""
|
|
return
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def from_python_obj(obj):
|
|
"""
|
|
Create a new Historic from the given python object.
|
|
|
|
If inheriting from this in a class that wraps a python object,
|
|
include a REGISTERED_NAME string attribute for the type it converts from.
|
|
Otherwise, override this function to just return obj.
|
|
"""
|
|
return
|
|
|
|
def accept_write(self, key):
|
|
"""
|
|
Update state based on a subscriber's write.
|
|
|
|
Override this method if a class also tracks historic state.
|
|
"""
|
|
self.notify_subscriber_write()
|
|
|
|
|
|
@dataclass
|
|
class Subscriber:
|
|
"""Class representing the subscriber to a Historic."""
|
|
|
|
obj: "typing.Any"
|
|
key: "typing.Any"
|
|
|
|
|
|
# 1. We only allow immutable types or types that have special logic
|
|
# for being inside a HistoryDict, or else we may miss changes and
|
|
# have difficulty converting to yaml.
|
|
# 2. Dictionaries are allowed to be passed in but are implicitly converted to Historic.
|
|
ALLOWED_TYPES = (bool, int, float, str, type(None), Historic)
|
|
|
|
|
|
class HistoryDict(MutableMapping, Historic):
|
|
"""
|
|
Dict-like class that tracks history.
|
|
|
|
Smart stored classes can decide for themselves what
|
|
counts as an access. Don't assume thread safety.
|
|
Note that this class will deep-copy stored values.
|
|
"""
|
|
|
|
REGISTERED_NAME = "dict"
|
|
|
|
def __init__(self, filename=None, yaml_string=None, raw_dict=None):
|
|
"""Init from a yaml file, from a yaml string, or default-construct."""
|
|
|
|
super(HistoryDict, self).__init__()
|
|
|
|
if filename is not None and yaml_string is not None:
|
|
raise ValueError("Cannot construct HistoryDict from both yaml object and file.")
|
|
|
|
self._history_store = defaultdict(list)
|
|
self._value_store = dict()
|
|
self._global_time = 0
|
|
|
|
raw_dict = default_if_none(raw_dict, {})
|
|
if filename is not None:
|
|
raw_dict = load_yaml_file(filename)
|
|
elif yaml_string is not None:
|
|
raw_dict = load_yaml(yaml_string)
|
|
else:
|
|
return # Just default-construct.
|
|
|
|
schema_version = raw_dict["SchemaVersion"]
|
|
if schema_version != SCHEMA_VERSION:
|
|
raise ValueError(
|
|
f"Invalid schema version. Expected {SCHEMA_VERSION} but found {schema_version}."
|
|
)
|
|
history_dict = raw_dict["History"]
|
|
for key in history_dict:
|
|
for raw_access in history_dict[key]:
|
|
access = Access.from_dict(raw_access)
|
|
self._history_store[key].append(access)
|
|
self._global_time = max(access.time, self._global_time)
|
|
last_val = self._retrieve_last_value(key)
|
|
if last_val is not TOMBSTONE:
|
|
self._value_store[key] = last_val
|
|
|
|
# The next recorded global time should be 1 higher than the last.
|
|
self._global_time += 1
|
|
|
|
def dump_history(self, filename=None, include_location=False):
|
|
"""Dump the history to as yaml."""
|
|
|
|
# We can't safe_dump python objects, so we convert the whole store
|
|
# to yaml-able data. (We assume the stored contents are yaml-able.)
|
|
dumpable = dict()
|
|
for key in self._history_store:
|
|
dumpable[key] = list()
|
|
for access in self._history_store[key]:
|
|
access_dict = access.as_dict()
|
|
if include_location:
|
|
access_dict["location"] = PipeLiteral(access_dict["location"])
|
|
else:
|
|
del access_dict["location"]
|
|
dumpable[key].append(access_dict)
|
|
|
|
to_dump = {"History": dumpable}
|
|
dump = yaml.dump(to_dump)
|
|
|
|
# Improves human readability for this data a ton.
|
|
processed = []
|
|
for line in dump.splitlines():
|
|
if not line.startswith((" -", " ")):
|
|
# If this line starts a top-level key.
|
|
processed.append("\n")
|
|
processed.append(line)
|
|
output = "\n".join(processed)
|
|
|
|
# Make sure SchemaVersion is at the top.
|
|
output = f'SchemaVersion: "{SCHEMA_VERSION}"\n' + output
|
|
if filename is not None:
|
|
with open(filename, "w") as fp:
|
|
fp.write(output)
|
|
|
|
return output
|
|
|
|
def write_equals(self, other_dict):
|
|
"""Compare two dicts for write equality."""
|
|
if not len(other_dict._value_store) == len(self._value_store):
|
|
return False
|
|
|
|
for key in self._value_store:
|
|
our_writes = [
|
|
access.value_written
|
|
for access in self._history_store[key]
|
|
if access.type == AccessType.WRITE
|
|
]
|
|
their_writes = [
|
|
access.value_written
|
|
for access in other_dict._history_store[key]
|
|
if access.type == AccessType.WRITE
|
|
]
|
|
if not our_writes == their_writes:
|
|
return False
|
|
return True
|
|
|
|
def to_storable_dict(self):
|
|
"""Convert to a dict for storage, overrides Historic."""
|
|
storable_dict = {}
|
|
for key, value in self._value_store.items():
|
|
storable_dict[key] = storable_dict_from_historic(value)
|
|
return {"object_class": HistoryDict.REGISTERED_NAME, "object_value": storable_dict}
|
|
|
|
@staticmethod
|
|
def from_storable_dict(raw_dict):
|
|
"""Convert from a dict for storage, overrides Historic."""
|
|
return HistoryDict(raw_dict=raw_dict["object_value"])
|
|
|
|
@staticmethod
|
|
def from_python_obj(obj):
|
|
"""Convert from a python object, overrides Historic."""
|
|
if not isinstance(obj, dict):
|
|
raise ValueError("HistoryDict can only be converted from dict python objects.")
|
|
history_dict = HistoryDict()
|
|
for key, value in obj.items():
|
|
history_dict[key] = make_historic(value)
|
|
return history_dict
|
|
|
|
def accept_write(self, key):
|
|
"""Record subscribee's write. Overrides Historic."""
|
|
self._record_write(key, self._value_store[key])
|
|
super(HistoryDict, self).accept_write(key)
|
|
|
|
def copy(self):
|
|
"""
|
|
Shallow-copy the value store, deep-copy history.
|
|
|
|
Don't record writes here.
|
|
"""
|
|
history_dict = HistoryDict()
|
|
history_dict._global_time = self._global_time
|
|
history_dict._history_store = copy.deepcopy(self._history_store)
|
|
for key, value in self.items():
|
|
history_dict[key] = make_historic(value)
|
|
return history_dict
|
|
|
|
def __getitem__(self, key):
|
|
# We don't return a deep copy because we rely on objects to alert us
|
|
# when modified.
|
|
return self._value_store[key]
|
|
|
|
def __setitem__(self, key, value):
|
|
# Implicitly convert dictionaries to HistoricDicts to avoid users having to manually wrap dictionaries.
|
|
# This should only used when assigning a dictionary directly to a key in HistoryDict and not a standalone
|
|
# variable, as its history will not be recorded.
|
|
if isinstance(value, dict):
|
|
value = make_historic(value)
|
|
|
|
if not isinstance(value, ALLOWED_TYPES):
|
|
raise ValueError(
|
|
f"HistoryDict cannot store type {type(value)}."
|
|
" Please use a different type or create a Historic wrapper."
|
|
)
|
|
self._record_write(key, value)
|
|
self._value_store[key] = value
|
|
if isinstance(value, HistoryDict):
|
|
value.subscribe(self, key)
|
|
|
|
self.notify_subscriber_write()
|
|
|
|
def __delitem__(self, key):
|
|
self._record_delete(key)
|
|
if isinstance(self._value_store[key], Historic):
|
|
self._value_store[key].unsubscribe(self)
|
|
del self._value_store[key]
|
|
|
|
self.notify_subscriber_write()
|
|
|
|
def __iter__(self):
|
|
return iter(self._value_store)
|
|
|
|
def __len__(self):
|
|
return len(self._value_store)
|
|
|
|
def __str__(self):
|
|
# Dict's str doesn't recursively convert subordinate HistoryDicts
|
|
pairs = []
|
|
for key, value in self._value_store.items():
|
|
if isinstance(value, str):
|
|
pairs.append(f"'{key}': '{str(value)}'")
|
|
else:
|
|
pairs.append(f"'{key}': {str(value)}")
|
|
return "{" + ", ".join(pairs) + "}"
|
|
|
|
def __repr__(self):
|
|
# eval(repr(self)) isn't valid, but this is at least useful for debugging.
|
|
return f"{self.__class__.__name__}({repr(self._value_store)})"
|
|
|
|
def _record_write(self, key, value):
|
|
written = None
|
|
if type(value) in ALLOWED_TYPES and value is not Historic:
|
|
written = value
|
|
|
|
cur_access = Access(
|
|
type=AccessType.WRITE,
|
|
location=_get_location(),
|
|
value_written=written,
|
|
time=self._global_time,
|
|
)
|
|
self._history_store[key].append(cur_access)
|
|
self._global_time += 1
|
|
|
|
def _record_delete(self, key):
|
|
cur_access = Access(
|
|
type=AccessType.DELETE,
|
|
location=_get_location(),
|
|
value_written=None,
|
|
time=self._global_time,
|
|
)
|
|
self._history_store[key].append(cur_access)
|
|
self._global_time += 1
|
|
|
|
def _retrieve_last_value(self, key):
|
|
for access in reversed(self._history_store[key]):
|
|
if access.type == AccessType.WRITE:
|
|
if isinstance(access.value_written, dict):
|
|
return historic_from_stored_dict(access.value_written)
|
|
else:
|
|
return copy.deepcopy(access.value_written)
|
|
elif access.type == AccessType.DELETE:
|
|
break
|
|
return TOMBSTONE
|
|
|
|
|
|
# Represents a value that was deleted (or was never created).
|
|
TOMBSTONE = object()
|
|
|
|
|
|
class AccessType(Enum):
|
|
"""Class representing the operation performed in an accesss."""
|
|
|
|
READ = 0 # Reads are not recorded here.
|
|
WRITE = 1
|
|
DELETE = 2
|
|
|
|
|
|
@dataclass
|
|
class Access:
|
|
"""Class representing an access to store in the dict's history."""
|
|
|
|
type: "AccessType"
|
|
time: int
|
|
location: ["traceback.FrameSummary"] = field(default_factory=list)
|
|
value_written: "typing.Any" = None
|
|
|
|
def as_dict(self):
|
|
"""Convert this class into a dict (accounting for AccessType)."""
|
|
self_dict = asdict(self)
|
|
self_dict["type"] = self_dict["type"].name
|
|
self_dict["value_written"] = copy.deepcopy(self.value_written)
|
|
return self_dict
|
|
|
|
@staticmethod
|
|
def from_dict(raw_dict):
|
|
"""Retrieve this class from a dict (accounting for AccessType)."""
|
|
return Access(
|
|
type=AccessType[raw_dict["type"]],
|
|
time=raw_dict["time"],
|
|
location=raw_dict["location"] if "location" in raw_dict else list(),
|
|
value_written=copy.deepcopy(raw_dict["value_written"])
|
|
if "value_written" in raw_dict
|
|
else None,
|
|
)
|
|
|
|
|
|
def _get_location():
|
|
"""Return the location as a string, accounting for this function and the parent in the stack."""
|
|
return "".join(traceback.format_stack(limit=STACK_LIMIT + 2)[:-2])
|
|
|
|
|
|
class PipeLiteral(str):
|
|
"""Construct with a string to create a pipe literal for yaml representation."""
|
|
|
|
pass
|
|
|
|
|
|
def pipe_literal_representer(dumper, data):
|
|
"""Create a representer for pipe literals, used internally for pyyaml."""
|
|
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
|
|
|
|
|
|
yaml.add_representer(PipeLiteral, pipe_literal_representer)
|