import enum import struct from . import jkrcomp __all__ = [ "JKRArchiveException", "JKRArchive", "JKRArchiveFile", "JKRPreloadType", "create_new_archive", "from_archive_buffer", "from_archive_file", "write_archive_file", "write_archive_buffer" ] # ---------------------------------------------------------------------------------------------------------------------- # Exception for JKRArchive-related actions. # ---------------------------------------------------------------------------------------------------------------------- class JKRArchiveException(Exception): """ Signals that an error occurred during any JKRArchive-related action. """ pass # ---------------------------------------------------------------------------------------------------------------------- # Declarations of folder node and directory entry structures. These low-level classes will not be visibly exported by # the module. Instead, JKRArchive handles these structures automatically. The module offers a high-level file accessor # for file directories instead. Some helper functions to read and write data can be found here as well. # ---------------------------------------------------------------------------------------------------------------------- def _file_name_to_hash_(file_name: str) -> int: file_hash = 0 for ch in file_name.encode("ascii"): file_hash = (file_hash * 3) + ch return file_hash & 0xFFFF def _calc_node_identifier_(dir_name: str, is_root: bool) -> int: # Root node uses "ROOT" as identifier if is_root: return 0x524F4F54 enc_upper = dir_name.upper().encode("ascii") len_enc_name = len(enc_upper) identifier = 0 for i in range(4): identifier <<= 8 if i >= len_enc_name: identifier += 0x20 else: identifier += enc_upper[i] return identifier class SNodeEntry: __STRUCT_BE__ = struct.Struct(">2I2HI") __STRUCT_LE__ = struct.Struct("<2I2HI") def __init__(self): self._identifier_ = 0 # Updated by "SNodeEntry._pack_". Currently not used by the module. self._off_name_ = 0 # Updated by "JKRArchive._pack_". self._hash_ = 0 # Updated by "SNodeEntry._pack_". Currently not used by the module. self._num_files_ = 0 # Updated by "SNodeEntry._pack_". self._idx_files_start_ = 0 # Updated by "JKRArchive._fix_nodes_and_directories_". self._archive_ = None # The archive to which this node belongs. self._name_ = "" # The node's folder name. self._dirs_ = list() # The directories held by this node. def _unpack_(self, data, off: int, is_big_endian: bool): strct = self.__STRUCT_BE__ if is_big_endian else self.__STRUCT_LE__ self._identifier_, self._off_name_, self._hash_, self._num_files_, self._idx_files_start_ = strct.unpack_from(data, off) def _pack_(self, data, off: int, is_big_endian: bool): strct = self.__STRUCT_BE__ if is_big_endian else self.__STRUCT_LE__ self._identifier_ = _calc_node_identifier_(self._name_, self._archive_._root_ == self) self._hash_ = _file_name_to_hash_(self._name_) self._num_files_ = len(self._dirs_) strct.pack_into(data, off, self._identifier_, self._off_name_, self._hash_, self._num_files_, self._idx_files_start_) class SDirEntry: __STRUCT_BE__ = struct.Struct(">2H4I") __STRUCT_LE__ = struct.Struct("<2H4I") def __init__(self): self._index_ = 0xFFFF # Updated whenever a file gets created, deleted, etc. self._hash_ = 0 # Updated by "SDirEntry._pack_". Currently not used by the module. self._attributes_ = 0 # Updated by "SDirEntry._pack_". self._off_name_ = 0 # Updated by "JKRArchive._pack_". self._off_file_data_ = 0 # Updated by "JKRArchive._pack_" (files) / "._fix_node_and_directories_" (folders). self._len_file_data_ = 0 # Updated by "SDirEntry._pack_". self._unk10_ = 0 # Currently not used by the module. self._archive_ = None # The archive to which this directory belongs. self._name_ = "" # The directory's name. self._parent_ = None # The parent node to which this directory belongs. self._node_ = None # The node that this directory represents (only for folders). self._file_ = None # The file accessor that this directory represents (only for files). def _unpack_(self, data, off: int, is_big_endian: bool): strct = self.__STRUCT_BE__ if is_big_endian else self.__STRUCT_LE__ self._index_, self._hash_, self._attributes_, self._off_file_data_, self._len_file_data_, self._unk10_ = strct.unpack_from(data, off) self._off_name_ = self._attributes_ & 0x00FFFFFF self._attributes_ = self._attributes_ >> 24 def _pack_(self, data, off: int, is_big_endian: bool): strct = self.__STRUCT_BE__ if is_big_endian else self.__STRUCT_LE__ # File if self._file_ is not None: self._attributes_ = DirAttr.FILE self._len_file_data_ = len(self._file_.data) if self._file_.compression == jkrcomp.JKRCompression.SZS: self._attributes_ |= DirAttr.COMPRESSED | DirAttr.USE_SZS elif self._file_.compression == jkrcomp.JKRCompression.SZP: self._attributes_ |= DirAttr.COMPRESSED if self._file_.preload == JKRPreloadType.MRAM: self._attributes_ |= DirAttr.PRELOAD_MRAM elif self._file_.preload == JKRPreloadType.ARAM: self._attributes_ |= DirAttr.PRELOAD_ARAM elif self._file_.preload == JKRPreloadType.DVD: self._attributes_ |= DirAttr.PRELOAD_DVD # Folder else: self._attributes_ = DirAttr.FOLDER self._index_ = 0xFFFF self._len_file_data_ = 0x10 self._hash_ = _file_name_to_hash_(self._name_) attr = (self._attributes_ << 24) | self._off_name_ strct.pack_into(data, off, self._index_, self._hash_, attr, self._off_file_data_, self._len_file_data_, self._unk10_) @property def is_file(self): return self._file_ is not None @property def is_folder(self): return self._file_ is None and self._name_ not in [".", ".."] @property def is_shortcut(self): return self._file_ is None and self._name_ in [".", ".."] class DirAttr(enum.IntFlag): FILE = 1 FOLDER = 2 COMPRESSED = 4 _8 = 8 PRELOAD_MRAM = 16 PRELOAD_ARAM = 32 PRELOAD_DVD = 64 USE_SZS = 128 def _read_string_(data, offset: int) -> str: end = offset while end < len(data) - 1 and data[end] != 0: end += 1 return data[offset:end + 1].decode("ascii").strip("\0") def _get_aligned_size_(val: int) -> int: a = val & 31 if a: return val + (32 - a) return val def _get_alignment_(data) -> bytes: pad_len = len(data) & 31 return bytes((32 - pad_len) if pad_len else 0) # ---------------------------------------------------------------------------------------------------------------------- # Declarations of high-level file accessor and enumerations. The file accessor "wraps" a file directory from an archive. # These will be visibly exported by the module. # ---------------------------------------------------------------------------------------------------------------------- class JKRPreloadType(enum.Enum): MRAM = 0 ARAM = 1 DVD = 2 class JKRArchiveFile: def __init__(self): self.data = None self.preload = JKRPreloadType.MRAM self._dir_ = None def __repr__(self): return self._dir_._name_ @property def name(self) -> str: return self._dir_._name_ @property def archive(self): return self._dir_._archive_ @property def index(self) -> int: return self._dir_._index_ @property def compression(self) -> jkrcomp.JKRCompression: return jkrcomp.check_compression(self.data) # ---------------------------------------------------------------------------------------------------------------------- # JKRArchive implementation according to the RARC / ResourceArchive format # ---------------------------------------------------------------------------------------------------------------------- class JKRArchive: __STRUCT_HEADER_BE__ = struct.Struct(">8I") __STRUCT_HEADER_LE__ = struct.Struct("<8I") __STRUCT_INFO_BE__ = struct.Struct(">6IH?") __STRUCT_INFO_LE__ = struct.Struct("<6IH?") __MAGIC__ = 0x52415243 # "RARC" hexspeak def __init__(self): self._root_ = None # The archive's root node. self._nodes_ = list() # All of the folder nodes. self._dirs_ = list() # Collection of all directory entries. self._next_file_id_ = 0 # The next file ID to be used. self._sync_file_ids_ = True # Synchronizes file IDs with dir entry index. # Lookup maps for fast file and node access self._lookup_files_ = dict() self._lookup_nodes_ = dict() # Temporary data storage; used during saving process self._mram_files_ = list() self._aram_files_ = list() self._dvd_files_ = list() # ------------------------------------------------------------------------------------------------------------------ # Packing and unpacking # ------------------------------------------------------------------------------------------------------------------ def _unpack_(self, data, is_big_endian: bool): # Attempt to decompress buffer first data = jkrcomp.decompress(data) if struct.unpack_from(">I" if is_big_endian else " bytearray: # Sort files into respective preload memory groups for dir_entry in self._dirs_: if dir_entry._file_ is not None: if dir_entry._file_.preload == JKRPreloadType.MRAM: self._mram_files_.append(dir_entry) elif dir_entry._file_.preload == JKRPreloadType.ARAM: self._aram_files_.append(dir_entry) elif dir_entry._file_.preload == JKRPreloadType.DVD: self._dvd_files_.append(dir_entry) # Gather information about the archive and prepare output buffer num_nodes = len(self._nodes_) off_nodes = 0x20 num_dirs = len(self._dirs_) off_dirs = _get_aligned_size_(off_nodes + num_nodes * 0x10) total_file_size = _get_aligned_size_(0x20 + off_dirs + num_dirs * 0x14) off_strings = total_file_size - 0x20 buffer = bytearray(total_file_size) # Collect strings; pool always starts out like this string_pool = bytearray() string_pool += f".\0..\0{self._root_._name_}\0".encode("ascii") self._root_._off_name_ = 5 def collect_strings(folder_node: SNodeEntry): nonlocal string_pool for dir_entry in folder_node._dirs_: if dir_entry._name_ == ".": dir_entry._off_name_ = 0 elif dir_entry._name_ == "..": dir_entry._off_name_ = 2 else: dir_entry._off_name_ = len(string_pool) string_pool += (dir_entry._name_ + "\0").encode("ascii") if dir_entry._file_ is None and dir_entry._name_ not in [".", ".."]: dir_entry._node_._off_name_ = dir_entry._off_name_ collect_strings(dir_entry._node_) collect_strings(self._root_) string_pool += _get_alignment_(string_pool) buffer += string_pool len_strings = len(string_pool) del string_pool # Write folder nodes off_tmp = 0x40 for folder_node in self._nodes_: folder_node._pack_(buffer, off_tmp, is_big_endian) off_tmp += 0x10 # Write file data data_start = len(buffer) def write_file_data(files: list) -> int: nonlocal buffer categorized_data_start = len(buffer) for dir_entry in files: dir_entry._off_file_data_ = len(buffer) - data_start buffer += dir_entry._file_.data buffer += _get_alignment_(dir_entry._file_.data) files.clear() return len(buffer) - categorized_data_start mram_size = write_file_data(self._mram_files_) aram_size = write_file_data(self._aram_files_) dvd_size = write_file_data(self._dvd_files_) # Write dir entries off_tmp = 0x20 + off_dirs for dir_entry in self._dirs_: dir_entry._pack_(buffer, off_tmp, is_big_endian) off_tmp += 0x14 # Write header and info block strct_header = self.__STRUCT_HEADER_BE__ if is_big_endian else self.__STRUCT_HEADER_LE__ strct_info = self.__STRUCT_INFO_BE__ if is_big_endian else self.__STRUCT_INFO_LE__ total_file_size = len(buffer) strct_header.pack_into(buffer, 0, self.__MAGIC__, total_file_size, 0x20, data_start - 0x20, total_file_size - data_start, mram_size, aram_size, dvd_size) strct_info.pack_into(buffer, 0x20, num_nodes, off_nodes, num_dirs, off_dirs, len_strings, off_strings, self._next_file_id_, self._sync_file_ids_) return buffer # ------------------------------------------------------------------------------------------------------------------ # Helper functions for structure # ------------------------------------------------------------------------------------------------------------------ def _initialize_lookup_(self): path = self._root_._name_.lower() self._lookup_nodes_[path] = self._root_ self._initialize_lookup_node_(self._root_, path) def _initialize_lookup_node_(self, folder_node: SNodeEntry, current_path: str): for dir_entry in folder_node._dirs_: if dir_entry.is_file: path = current_path + "/" + dir_entry._name_.lower() self._lookup_files_[path] = dir_entry._file_ elif dir_entry.is_folder: next_node = dir_entry._node_ path = current_path + "/" + next_node._name_.lower() self._lookup_nodes_[path] = next_node self._initialize_lookup_node_(next_node, path) def _fix_nodes_and_directories_(self): self._dirs_.clear() self._fix_node_and_directories_(self._root_) self._recalculate_file_indices_() def _fix_node_and_directories_(self, folder_node: SNodeEntry): # Put the shortcut directories ("." and "..") at the end of the node's directory list folders = list() shortcuts = list() for dir_entry in list(folder_node._dirs_): if dir_entry.is_shortcut: shortcuts.append(dir_entry) elif dir_entry.is_folder: folders.append(dir_entry) for subdir in shortcuts: node_index = self._nodes_.index(subdir._node_) if subdir._node_ is not None else 0xFFFFFFFF subdir._off_file_data_ = node_index folder_node._dirs_.remove(subdir) folder_node._dirs_.append(subdir) # Update the node's directory span folder_node._idx_files_start_ = len(self._dirs_) self._dirs_ += folder_node._dirs_ # Handle subfolders as well for subdir in folders: node_index = self._nodes_.index(subdir._node_) subdir._off_file_data_ = node_index self._fix_node_and_directories_(subdir._node_) def _recalculate_file_indices_(self): if self._sync_file_ids_: self._next_file_id_ = len(self._dirs_) for i, dir_entry in enumerate(self._dirs_): if dir_entry.is_file: dir_entry._index_ = i else: file_id = 0 for dir_entry in self._dirs_: if dir_entry.is_file: dir_entry._index_ = file_id file_id += 1 self._next_file_id_ = file_id def _create_dir_entry_(self, name: str, attr: int, node: SNodeEntry, parent_node: SNodeEntry) -> SDirEntry: dir_entry = SDirEntry() dir_entry._name_ = name dir_entry._attributes_ = attr dir_entry._node_ = node dir_entry._parent_ = parent_node dir_entry._archive_ = self parent_node._dirs_.append(dir_entry) return dir_entry def _create_root_(self, root_name: str): if self._root_: raise JKRArchiveException("Fatal! Root already exists!") root_node = SNodeEntry() root_node._name_ = root_name root_node._archive_ = self self._root_ = root_node self._nodes_.append(root_node) self._create_dir_entry_(".", DirAttr.FOLDER, root_node, root_node) self._create_dir_entry_("..", DirAttr.FOLDER, None, root_node) self._lookup_nodes_[root_name.lower()] = root_node self._fix_nodes_and_directories_() # ------------------------------------------------------------------------------------------------------------------ # High-level operations # ------------------------------------------------------------------------------------------------------------------ @property def sync_file_ids(self) -> bool: return self._sync_file_ids_ @sync_file_ids.setter def sync_file_ids(self, val: bool): self._sync_file_ids_ = val self._recalculate_file_indices_() @property def root_name(self) -> str: return self._root_._name_ def directory_exists(self, file_path: str) -> bool: key = file_path.lower() return key in self._lookup_files_ or key in self._lookup_nodes_ def list_files(self, folder_path: str) -> list: key = folder_path.lower() if key not in self._lookup_nodes_: raise JKRArchiveException(f"The folder {folder_path} does not exist!") return list(de._file_ for de in filter(lambda de: de.is_file, self._lookup_nodes_[key]._dirs_)) def list_folders(self, folder_path: str) -> list: key = folder_path.lower() if key not in self._lookup_nodes_: raise JKRArchiveException(f"The folder {folder_path} does not exist!") return list(de._name_ for de in filter(lambda de: de.is_folder, self._lookup_nodes_[key]._dirs_)) def get_file(self, file_path: str) -> JKRArchiveFile: key = file_path.lower() if key not in self._lookup_files_: raise JKRArchiveException(f"The file {file_path} does not exist!") return self._lookup_files_[key] def create_folder(self, folder_path: str): # Check if directory already exists full_key = folder_path.lower() if full_key in self._lookup_nodes_ or full_key in self._lookup_files_: raise JKRArchiveException(f"The directory {folder_path} already exists!") # Get parent folder first split_path = folder_path.rsplit("/", 1) if len(split_path) == 1: return None folder_path, folder_name = split_path folder_path_key = folder_path.lower() if folder_path_key not in self._lookup_nodes_: raise JKRArchiveException("Cannot create folder. Archive does not contain the folder " + folder_path) folder_node = self._lookup_nodes_[folder_path_key] # Create new node and directories new_folder_node = SNodeEntry() new_folder_node._name_ = folder_name new_folder_node._archive_ = self self._nodes_.append(new_folder_node) self._create_dir_entry_(folder_name, DirAttr.FOLDER, new_folder_node, folder_node) self._create_dir_entry_(".", DirAttr.FOLDER, new_folder_node, new_folder_node) self._create_dir_entry_("..", DirAttr.FOLDER, folder_node, new_folder_node) self._lookup_nodes_[full_key] = new_folder_node self._fix_nodes_and_directories_() def create_file(self, file_path: str, data=bytearray(), preload: JKRPreloadType = JKRPreloadType.MRAM) -> JKRArchiveFile: # Check if directory already exists full_key = file_path.lower() if full_key in self._lookup_nodes_ or full_key in self._lookup_files_: raise JKRArchiveException(f"The directory {file_path} already exists!") # Get parent folder first split_path = file_path.rsplit("/", 1) if len(split_path) == 1: return None folder_path, file_name = split_path folder_path_key = folder_path.lower() if folder_path_key not in self._lookup_nodes_: raise JKRArchiveException("Cannot create file. Archive does not contain the folder " + folder_path) folder_node = self._lookup_nodes_[folder_path_key] # Create new directory and file entries dir_entry = self._create_dir_entry_(file_name, DirAttr.FILE, None, folder_node) new_file = JKRArchiveFile() new_file.data = data new_file.preload = preload new_file._dir_ = dir_entry dir_entry._file_ = new_file self._lookup_files_[full_key] = new_file self._fix_nodes_and_directories_() return new_file def remove_file(self, file_path: str): key = file_path.lower() if key not in self._lookup_files_: raise JKRArchiveException(f"The file {file_path} does not exist!") file_access = self._lookup_files_[key] dir_entry = file_access._dir_ parent_node = dir_entry._parent_ # Detach file and clear access parent_node._dirs_.remove(dir_entry) dir_entry._parent_ = None dir_entry._archive_ = None dir_entry._file_ = None file_access.data = None file_access._dir_ = None self._lookup_files_.pop(key) self._dirs_.remove(dir_entry) self._recalculate_file_indices_() def remove_folder(self, folder_path: str): raise NotImplementedError def __repr__(self): return self._print_(self._root_, 0) def _print_(self, folder_node: SNodeEntry, depth: int): indent = " " * depth result = indent + folder_node._name_ + "\n" folders = list() files = list() for dir_entry in folder_node._dirs_: if dir_entry.is_file: files.append(dir_entry._file_) elif dir_entry.is_folder: folders.append(dir_entry._node_) for folder_node in folders: result += self._print_(folder_node, depth + 1) for file_access in files: result += f" {indent}{file_access.name}\n" return result # ---------------------------------------------------------------------------------------------------------------------- # Helper I/O and creation functions # ---------------------------------------------------------------------------------------------------------------------- def create_new_archive(root_name: str, sync_file_ids: bool = True): jkrarc = JKRArchive() jkrarc._sync_file_ids_ = sync_file_ids jkrarc._create_root_(root_name) return jkrarc def from_archive_buffer(buffer, big_endian: bool = True) -> JKRArchive: jkrarc = JKRArchive() jkrarc._unpack_(buffer, big_endian) return jkrarc def from_archive_file(file_path: str, big_endian: bool = True) -> JKRArchive: jkrarc = JKRArchive() with open(file_path, "rb") as f: jkrarc._unpack_(f.read(), big_endian) return jkrarc def write_archive_buffer(jkrarc: JKRArchive, big_endian: bool = True, compression: jkrcomp.JKRCompression = jkrcomp.JKRCompression.NONE, level: int = 7): return jkrcomp.compress(jkrarc._pack_(big_endian), compression, level) def write_archive_file(jkrarc: JKRArchive, file_path: str, big_endian: bool = True, compression: jkrcomp.JKRCompression = jkrcomp.JKRCompression.NONE, level: int = 7): buffer = jkrcomp.compress(jkrarc._pack_(big_endian), compression, level) with open(file_path, "wb") as f: f.write(buffer) f.flush()