# /utils/paths/path_simplification.py
"""The scholar_flux.utils.paths.path_simplification module implements the PathSimplifier for flattening JSON records.
This simplifier is used in the latter path processing steps to coerce a nested JSON structure represented by a
PathNodeIndex into a singular list of dictionaries.
The PathSimplifier will return the full paths where each nested JSON value can be found, if allowed. Otherwise, the
PathSimplifier will attempt to shorten the names in the final dictionary of paths up to the user-specified nested
key (component) length while preventing name collisions from occurring.
"""
from __future__ import annotations
import logging
from typing import Optional, List, Dict, Union, Any, Set
from collections import defaultdict
from scholar_flux.exceptions.path_exceptions import PathSimplificationError
from scholar_flux.utils import unlist_1d
from scholar_flux.utils.paths import ProcessingPath, PathNode
from dataclasses import dataclass, field
# configure logging
logger = logging.getLogger(__name__)
@dataclass
class PathSimplifier:
"""A utility class for simplifying and managing Processing Paths.
Args:
delimiter (str): The delimiter to use when splitting paths.
non_informative (Optional[List[str]]): A list of non-informative components to remove from paths.
Attributes:
delimiter (str): The delimiter used to separate components in the path.
non_informative (List[str]): A list of non-informative components to be removed during simplification.
name_mappings (Dict[ProcessingPath, str]): A dictionary for tracking unique names to avoid collisions.
"""
delimiter: str = ProcessingPath.DEFAULT_DELIMITER
non_informative: list[str] = field(default_factory=list)
name_mappings: Dict[ProcessingPath, str] = field(default_factory=dict)
def _generate_base_name(self, path: ProcessingPath, max_components: int) -> ProcessingPath:
"""Generate a base name from the Processing Path.
Args:
path (ProcessingPath): The ProcessingPath object representing the path components.
max_components (int): The maximum length of informative components to use in the name.
Returns:
ProcessingPath: A Processing Path for the base name generated from the path.
Raises:
PathSimplificationError: If an error occurs during name generation.
"""
if not isinstance(path, ProcessingPath) or path.depth < 1:
raise PathSimplificationError(
f"The provided path must be a ProcessingPath object of at least depth 1. Received: {path}"
)
try:
return path.get_name(max_components=max_components)
except Exception as e:
raise PathSimplificationError(f"Error generating base name for path {path}: {e}")
def _handle_collision(self, original_name: ProcessingPath) -> ProcessingPath:
"""Handle name collisions by appending a unique number to the original name.
Args:
original_name (ProcessingPath): The original name that caused a collision.
Returns:
ProcessingPath: A unique name generated by appending a number to the original name.
Raises:
PathSimplificationError: If an error occurs while handling the collision.
"""
try:
counter = 1
base_name_str = str(original_name)
while f"{base_name_str}_{counter}" in self.name_mappings.values():
counter += 1
return original_name / f"_{counter}"
except Exception as e:
raise PathSimplificationError(f"Error handling name collision for {original_name}: {e}")
[docs]
def generate_unique_name(
self,
path: ProcessingPath,
max_components: Optional[int],
remove_noninformative: bool = False,
) -> ProcessingPath:
"""Generate a unique name for the given Processing Path.
Args:
path (ProcessingPath): The ProcessingPath object representing the path components.
max_components (int): The maximum number of components to use in the name.
remove_noninformative (bool): Whether to remove non-informative components.
Returns:
ProcessingPath: A unique ProcessingPath name.
Raises:
PathSimplificationError: If an error occurs during name generation.
"""
if not isinstance(path, ProcessingPath):
raise PathSimplificationError("The provided path must be a ProcessingPath object.")
try:
path_fmt = path.remove(self.non_informative) if remove_noninformative else path
if path_fmt.depth >= 1 and path_fmt.depth != path.depth:
logger.debug(f"Simplifying with formatted path: {path_fmt}")
path = path_fmt
if max_components is not None:
component_index = max_components
candidate_name = self._generate_base_name(path, component_index)
while candidate_name in self.name_mappings.values() or (
candidate_name.info_content(self.non_informative) < max_components
and component_index < len(path.components)
):
component_index += 1
candidate_name = self._generate_base_name(path, component_index)
if component_index >= len(path.components):
break
else:
candidate_name = path_fmt
if candidate_name in self.name_mappings.values():
candidate_name = self._handle_collision(candidate_name)
return candidate_name
except Exception as e:
raise PathSimplificationError(f"Error generating unique name for path {path}: {e}")
[docs]
def simplify_paths(
self,
paths: Union[List[Union[ProcessingPath, str]], Set[Union[ProcessingPath, str]]],
max_components: Optional[int],
remove_noninformative: bool = False,
) -> Dict[ProcessingPath, str]:
"""Simplify paths by removing non-informative components and selecting the last 'max_components' informative
components.
Args:
paths (List[Union[ProcessingPath, str]]): List of path strings or ProcessingPaths to simplify.
max_components (int): The maximum desired number of informative components to retain in the simplified path.
remove_noninformative (bool): Whether to remove non-informative components.
Returns:
Dict[ProcessingPath, str]: A dictionary mapping the original path to its simplified unique group name
for all elements within the same path after removing indices
Raises:
PathSimplificationError: If an error occurs during path simplification.
"""
# if not (paths and self.name_mappings):
# raise PathSimplificationError('A valid list of paths and a non-empty name mappings dictionary is required for simplification.')
try:
for original_path in paths:
path = (
ProcessingPath(original_path, delimiter=self.delimiter)
if not isinstance(original_path, ProcessingPath)
else original_path
)
path_group = path.group()
# unique_name = self.name_mappings.get(path) or (path if max_components is None else self.generate_unique_name(path, max_components, remove_noninformative))
unique_group_name = self.name_mappings.get(path_group) or self.generate_unique_name(
path_group, max_components, remove_noninformative
)
self.name_mappings[path_group] = str(unique_group_name)
return self.name_mappings
except Exception as e:
raise PathSimplificationError(f"Error simplifying paths {paths}: {e}")
[docs]
def simplify_to_row(
self,
terminal_nodes: List[PathNode] | Set[PathNode],
collapse: Optional[str] = ";",
) -> Dict[str, Any]:
"""Simplify terminal nodes by mapping them to their corresponding unique names.
Args:
terminal_nodes (List[PathNode]): A list of PathNode objects representing the terminal nodes.
collapse (Optional[str]): The separator to use when collapsing multiple values into a single string.
Returns:
Dict[str, Union[List[str], str]]: A dictionary mapping unique names to their corresponding values or collapsed strings.
Raises:
PathSimplificationError: If an error occurs during simplification.
"""
if not (terminal_nodes and self.name_mappings):
raise PathSimplificationError(
"A valid list of PathNodes and a non-empty name mappings dictionary is required for simplification."
)
try:
row_dict = defaultdict(list)
for node in sorted(terminal_nodes):
if not isinstance(node, PathNode):
raise PathSimplificationError(f"Invalid node object: {node}")
original_path = node.path
path_group = node.path_group
unique_name = self.name_mappings.get(path_group)
if unique_name is None:
raise PathSimplificationError(f"Original path: {original_path} has no mapping.")
row_dict[unique_name].append(node.value)
return {k: (self._collapse(v, collapse) if collapse else unlist_1d(v)) for k, v in row_dict.items()}
except Exception as e:
raise PathSimplificationError(f"Error simplifying terminal nodes {terminal_nodes}: {e}")
@classmethod
def _collapse(cls, obj: Any, delimiter: str) -> Any:
"""Helper method for collapsing an item or list of items into a joined string if possible. If an object that is
an empty string or list is received, this function will return None instead.
Args:
obj (Any): The object to flatten and collapse into a string
Returns:
Any: A flattened representation if possible, otherwise a joined string representation of the object
"""
unnested_obj = unlist_1d(obj)
if unnested_obj in ([], "") or unnested_obj is None:
return None
return delimiter.join(map(str, obj))
[docs]
def get_mapped_paths(self) -> Dict[ProcessingPath, str]:
"""Get the current name mappings.
Returns:
Dict[ProcessingPath, str]: The dictionary of mappings from original paths to simplified names.
Example:
### simplifier = PathSimplifier()
### simplifier.simplify_paths(['a/b/c', 'a/b/d'], 2)
### simplifier.get_mapped_paths()
Output:
{ProcessingPath('a/b/c'): 'c', ProcessingPath('a/b/d'): 'd'}
"""
return self.name_mappings
[docs]
def clear_mappings(self) -> None:
"""Clear all existing path mappings.
Example:
### simplifier = PathSimplifier()
### simplifier.simplify_paths(['a/b/c', 'a/b/d'], 2)
### simplifier.clear_mappings()
### simplifier.get_mapped_paths()
Output:
{}
"""
self.name_mappings.clear()
__all__ = ["PathSimplifier"]