# /utils/paths/path_node_map.py
"""The scholar_flux.utils.paths.path_node_map module implements the PathNodeMap that is used to record terminal path-
value combinations that enables more efficient mapping, retrieval, and updates to terminal path node combinations."""
from __future__ import annotations
import copy
from typing import Optional, Union, Set, Generator, MutableMapping, Mapping, Sequence
from collections import UserDict
from scholar_flux.exceptions.path_exceptions import (
InvalidProcessingPathError,
PathNodeMapError,
)
from types import GeneratorType
from scholar_flux.utils.paths import ProcessingPath, PathNode, PathProcessingCache
from scholar_flux.utils import unlist_1d
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)
class PathNodeMap(UserDict[ProcessingPath, PathNode]):
"""A dictionary-like class that maps Processing paths to PathNode objects."""
DEFAULT_USE_CACHE: bool = True
[docs]
def __init__(
self,
*nodes: Union[
PathNode,
Generator[PathNode, None, None],
tuple[PathNode],
list[PathNode],
set[PathNode],
dict[str, PathNode],
dict[ProcessingPath, PathNode],
],
use_cache: Optional[bool] = None,
allow_terminal: Optional[bool] = False,
overwrite: Optional[bool] = True,
**path_nodes: Mapping[str | ProcessingPath, PathNode],
) -> None:
"""Initializes the PathNodeMap instance."""
super().__init__()
logger.debug("Initializing PathNodeMap instance") # Log initialization
self.use_cache: bool = use_cache if use_cache is not None else self.DEFAULT_USE_CACHE # Store the cache flag
self.allow_terminal = (allow_terminal or False,) # Store the allow_terminal flag
self.overwrite: bool = overwrite or False # Store the overwrite flag
self._cache: PathProcessingCache = PathProcessingCache()
if nodes or path_nodes:
self.update(*nodes, **path_nodes, overwrite=self.overwrite)
def __contains__(self, key: object) -> bool:
"""Checks if a key exists in the PathNodeMap instance.
Args:
key (Union[str, ProcessingPath, PathNode]): The key that is, or contains, a Processing path to check.
Returns:
bool: True if the key exists, False otherwise.
"""
if key is None:
return False
if isinstance(key, PathNode):
key = key.path
if isinstance(key, str):
key = ProcessingPath.with_inferred_delimiter(key)
if not isinstance(key, ProcessingPath):
raise InvalidProcessingPathError(
f"Unexpected value type observed: {type(key)}: Expected a ProcessingPath/string"
)
return self.data.get(key) is not None
def __setitem__(
self,
key: ProcessingPath,
value: PathNode,
# *, overwrite: Optional[bool] = None
) -> None:
"""Sets an item in the PathNodeMap instance.
Args:
key (ProcessingPath): The key (Processing path) to set.
value (PathNode): The value (PathNode instance) to associate with the key.
overwrite (bool): Flag indicating whether to overwrite the existing value if the key already exists.
Raises:
PathNodeMapError: If the key already exists and overwrite is False.
InvalidPathNodeError: If the value is not a PathNode instance.
InvalidProcessingPathError: If the key is not a ProcessingPath instance.
"""
# Check if the key already exists and handle overwriting behavior
key = self._validate_input(key, value, overwrite=self.overwrite)
self._remove_nonterminal_nodes(value.path)
super().__setitem__(key, value)
if self.use_cache:
self._cache.lazy_add(key)
# self._add_to_cache(key)
def __delitem__(self, key: Union[str, ProcessingPath]) -> None:
"""Deletes an item from the PathNodeMap instance.
Args:
key (ProcessingPath): The key (Processing path) to delete.
Raises:
PathNodeMapError: If the key does not exist in the PathNodeMap.
"""
key = self._validate_path(key)
if key not in self:
raise PathNodeMapError(f'Key "{key}" not found in the PathNodeMap.')
super().__delitem__(key)
if self.use_cache:
self._cache.lazy_remove(key)
@property
def nodes(self) -> list[PathNode]:
"""Enables the retrieval of paths stored within the current map as a property."""
return list(self.data.values())
@property
def paths(self) -> list[ProcessingPath]:
"""Enables retrieval of nodes stored within the current map as a property."""
return list(self.data.keys())
[docs]
def filter(
self,
prefix: ProcessingPath | str | int,
min_depth: Optional[int] = None,
max_depth: Optional[int] = None,
from_cache: Optional[bool] = None,
) -> dict[ProcessingPath, PathNode]:
"""Filter the PathNodeMap for paths with the given prefix.
Args:
prefix (ProcessingPath): The prefix to search for.
min_depth (Optional[int]): The minimum depth to search for. Default is None.
max_depth (Optional[int]): The maximum depth to search for. Default is None.
from_cache (Optional[bool]): Whether to use cache when filtering based on a path prefix.
Returns:
dict[Optional[ProcessingPath], Optional[PathNode]]: A dictionary of paths with the given prefix and their corresponding
terminal_nodes
Raises:
PathNodeMapError: If an error occurs while filtering the PathNodeMap.
"""
use_cache = from_cache if from_cache is not None else self.use_cache
prefix = ProcessingPath.to_processing_path(prefix) if not isinstance(prefix, ProcessingPath) else prefix
try:
terminal_nodes = (
self._cache_filter(prefix, min_depth, max_depth)
if use_cache
else self._filter(prefix, min_depth, max_depth)
)
return terminal_nodes
except Exception as e:
raise PathNodeMapError(f"Error filtering paths with prefix {prefix}: {e}") from e
def _filter(
self,
prefix: ProcessingPath,
min_depth: Optional[int] = None,
max_depth: Optional[int] = None,
) -> dict[ProcessingPath, PathNode]:
"""Filter the PathNodeMap for paths with the given prefix.
Args:
prefix (ProcessingPath): The prefix to search for.
min_depth (Optional[int]): The minimum depth to search for. Default is None.
max_depth (Optional[int]): The maximum depth to search for. Default is None.
from_cache (Optional[int]): Whether to use cache when filtering based on a path prefix.
Returns:
dict[Optional[ProcessingPath], Optional[PathNode]]: A dictionary of paths with the given prefix and their corresponding
terminal_nodes
Raises:
PathNodeMapError: If an error occurs while filtering the PathNodeMap.
"""
try:
if (min_depth is not None and min_depth < 0) or (max_depth is not None and max_depth < 1):
raise ValueError(
f"Minimum and Maximum depth must be None or greater than 0 or 1, respectively. Received: min={min_depth}, max={max_depth}"
)
if not isinstance(prefix, (str, ProcessingPath)):
raise InvalidProcessingPathError(
f"Key must be a ProcessingPath. Received: {prefix} - type={type(prefix)}"
)
terminal_node_list = {
path: node
for path, node in self.data.items()
if (min_depth is None or min_depth <= path.depth)
and (max_depth is None or path.depth <= max_depth)
and path.has_ancestor(prefix)
or path == prefix
}
return terminal_node_list
except Exception as e:
raise PathNodeMapError(f"Error filtering paths with prefix {prefix} at max_depth {max_depth}") from e
def _remove_nonterminal_nodes(self, path: ProcessingPath) -> None:
"""Filter the PathNodeMap for paths with the given prefix.
Args:
path (ProcessingPath): The prefix to search for.
from_cache (Optional[int]): Whether to use cache when filtering based on a path prefix.
Returns:
dict[Optional[ProcessingPath], Optional[PathNode]]: A dictionary of paths with the given prefix and their corresponding
terminal_nodes
Raises:
PathNodeMapError: If an error occurs while filtering the PathNodeMap.
"""
try:
path_ancestors = path.get_ancestors()
if removed_nodes := [
ancestor_path
for ancestor_path in path_ancestors
if ancestor_path is not None and self.data.pop(ancestor_path, None) is not None
]:
logger.debug(f"Removed {len(removed_nodes)} nodes that are no longer terminal: {removed_nodes}")
except Exception as e:
raise PathNodeMapError(f"Error searching for and removing ancestor paths for the path: {path}") from e
def _cache_filter(
self,
prefix: ProcessingPath,
min_depth: Optional[int] = None,
max_depth: Optional[int] = None,
) -> dict[ProcessingPath, PathNode]:
"""Use the enabled cache to filter the PathNodeMap for paths with the given prefix.
Args:
prefix (ProcessingPath): The prefix to search for.
min_depth (Optional[int]): The minimum depth to search for. Default is None.
max_depth (Optional[int]): The maximum depth to search for. Default is None.
Returns:
dict[Optional[ProcessingPath], Optional[PathNode]]: A dictionary of paths with the given prefix and their corresponding
terminal_nodes
Raises:
PathNodeMapError: If an error occurs while filtering the PathNodeMap.
"""
try:
if not self.use_cache:
raise PathNodeMapError("Cannot filter without cache. Please enable cache during initialization.")
terminal_node_list = self._cache.filter(prefix, min_depth=min_depth, max_depth=max_depth)
terminal_nodes = {path: self.data[path] for path in terminal_node_list}
return terminal_nodes
except Exception as e:
raise PathNodeMapError(f"Error filtering paths with prefix {prefix} at max_depth {max_depth}: {e}") from e
def _validate_key_value_pair(self, processing_path: ProcessingPath, node: PathNode) -> None:
"""
Validate the current key-value pair of the node and path being used as a key within the PathNodeMap.
Validates in terms of data integrity: name of key if provided matches name of node.path
name [last component] = node.path [last component]
Args:
processing_path (ProcessingPath): The ProcessingPath instance to compare against the current path already associated with the PathNode.
node (PathNode): The PathNode instance containing the full path to compare.
Raises:
PathNodeMapError: If the equal name/complete path constraint is violated.
"""
if not isinstance(processing_path, ProcessingPath):
raise PathNodeMapError(f"Invalid path path: {processing_path}. Must be a ProcessingPath instance.")
if processing_path.depth == 1:
if processing_path.get_name() != node.path.get_name():
raise PathNodeMapError(
f"Invalid path path name: The name of the current node {processing_path} does not match the name of the last component of the path within the provided node: {node})"
)
# Check if the processing_path matches the node's full path exactly
elif processing_path != node.path:
raise PathNodeMapError(
f"Invalid path: The key provided as a path: {processing_path} does not match the path within the provided node: {node}"
)
# Prevent reassigning paths to the same path map
# if processing_path in self and self[processing_path] is not node:
# raise PathNodeMapError(f'Non-unique path: {processing_path}. Reassigning paths to the same map is not allowed.')
if descendant_nodes := self.filter(node.path, min_depth=node.path.depth + 1):
raise PathNodeMapError(
f"Unable to insert node at path ({node.path}): There are a total of {len(descendant_nodes)} nodes containing the path of the current node as a prefix."
)
[docs]
def node_exists(self, node: Union[PathNode, ProcessingPath]) -> bool:
"""Helper method to validate whether the current node exists."""
if not isinstance(node, (PathNode, ProcessingPath)):
raise KeyError(f"Key must be node or path. Received '{type(node)}'")
if isinstance(node, PathNode):
node = node.path
return self.data.get(node) is not None
def _validate_new_node_path(self, node: Union[PathNode, ProcessingPath], overwrite: Optional[bool] = None):
"""Helper method to validate whether the current node already exists in the current map: Raises an error if the
field does.
otherwise, if overwriting is enabled, indicates that the current node will be overwritten
"""
overwrite = overwrite if overwrite is not None else self.overwrite
if self.node_exists(node):
if not overwrite:
raise PathNodeMapError(f"A path and node at '{node}' already exists in the Map")
else:
logger.debug(f"The node at '{node}' will be overwritten")
def _validate_node(self, node: PathNode, overwrite: Optional[bool] = None):
"""Validate constraints on the node to be inserted into the PathNodeMap.
Args:
node (PathNode): The PathNode instance to validate.
Raises:
PathNodeMapError: If any constraint is violated.
"""
try:
PathNode.is_valid_node(node)
self._validate_new_node_path(node, overwrite=overwrite)
logger.debug(f"Validated node: {node}")
except Exception as e:
raise PathNodeMapError(f"Error validating constraints on node insertion: {e}") from e
@classmethod
def _keep_terminal_paths(
cls,
path_list: Union[
list[ProcessingPath],
Set[ProcessingPath],
Generator[ProcessingPath, None, None],
],
) -> Set[ProcessingPath]:
"""Filter a list of paths to keep only terminal paths.
Args:
path_list (list[ProcessingPath]): The list of paths to filter.
Returns:
Set[ProcessingPath]: A set of terminal paths.
"""
sorted_path_list = sorted(
path_list,
key=lambda path: path._to_alphanum(depth_first=True),
reverse=True,
)
if not sorted_path_list:
return set()
max_depth = sorted_path_list[0].depth
filtered_path_list = set()
all_prefixes = set()
for path in sorted_path_list:
if path.depth == max_depth:
filtered_path_list.add(path)
continue
# Check if path is already a known prefix
if path in all_prefixes:
logger.warning(f"The path '{path}' is non-terminal in the list of node to add. Removing...")
continue
if path in filtered_path_list:
logger.warning(f"The path '{path}' is duplicated. Removing and retaining the last inputted entry...")
continue
# If not, add it to the filtered_path_list
filtered_path_list.add(path)
# Add all prefixes of this string to the set
all_prefixes.update(path.get_ancestors())
return filtered_path_list
@staticmethod
def _transform_key(key: Union[str, ProcessingPath], delimiter: str) -> ProcessingPath:
"""For coercing string type keys into ProcessingPaths if not already path types
Args:
path (Union[str, list[str]]): The initial path, either as a string or a list of strings.
delimiter (str): The delimiter used to separate components in the path.
Returns:
ProcessingPath: the path leading to the node that this object corresponds to
"""
if not isinstance(key, ProcessingPath):
transformed_key = ProcessingPath.to_processing_path(key, component_types=None, delimiter=delimiter)
if key is not transformed_key:
logger.debug(f"converted {key} --> {transformed_key}")
return transformed_key
return key
def _validate_input(
self,
path: Union[str, ProcessingPath],
node: PathNode,
overwrite: Optional[bool] = None,
) -> ProcessingPath:
"""Method of performing key-value pair validation while returning the path if the pair is valid:
Args:
path (Union[str, ProcessingPath]): The initial path, formatted as a string or a ProcessingPath instance.
node (PathNode): The PathNode instance to validate.
Returns:
ProcessingPath: A ProcessingPath instance.
Raises:
PathNodeMapError If the path object, node path is invalid, or combination of the key and node pair is invalid.
"""
try:
self._validate_node(node, overwrite)
transformed_path = self._transform_key(path, delimiter=node.path.delimiter)
self._validate_key_value_pair(transformed_path, node)
return transformed_path
except Exception as e:
raise PathNodeMapError(f"Error validating path ({path}) and node ({node}): {e}") from e
# def _extract_node(self, *nodes) -> Optional[PathNode]:
# """
# Attempts to extract a node from arguments of arbitrary lengths.
# If there is more than one node, this method will return None with
# the aim of deferring processing multiple nodes to other helper methods
# """
# if isinstance(nodes, PathNode):
# return nodes
# if isinstance(nodes, (list, tuple)) and len(nodes) == 1:
# node = nodes[0]
# return node if isinstance(node, PathNode) else None
# return None
@classmethod
def _format_nodes_as_dict(cls, *nodes, **path_nodes) -> Union[
PathNodeMap,
dict[ProcessingPath, PathNode],
]:
"""Helper function to format the input arguments as a dictionary."""
type_verified = False
node_dict = cls.format_mapping(path_nodes) if path_nodes else {}
if not nodes:
return node_dict
formatted_nodes: tuple | MutableMapping | list | set | Generator = (
nodes[0] if (isinstance(nodes, tuple) and len(nodes) == 1 and not isinstance(nodes[0], PathNode)) else nodes
)
if isinstance(formatted_nodes, PathNode):
processed_nodes: Optional[MutableMapping] = {formatted_nodes.path: formatted_nodes}
type_verified = True
elif isinstance(formatted_nodes, (set, Sequence, GeneratorType)):
processed_nodes = {node.path: node for node in formatted_nodes if PathNode.is_valid_node(node)}
type_verified = True
elif isinstance(formatted_nodes, MutableMapping):
processed_nodes = formatted_nodes
else:
processed_nodes = None
if isinstance(processed_nodes, (MutableMapping, PathNodeMap)) and (
type_verified
or isinstance(processed_nodes, PathNodeMap)
or all(isinstance(node, PathNode) for node in processed_nodes.values())
):
node_dict = node_dict | cls.format_mapping(processed_nodes)
return node_dict
raise PathNodeMapError(
"Could not format the input as a dictionary of nodes: Expected the input to be a "
f"PathNode or sequence/mapping containing PathNodes. Instead received {type(unlist_1d(nodes))}"
)
[docs]
def update( # type: ignore[override]
self,
*args,
overwrite: Optional[bool] = None,
**kwargs: Mapping[str | ProcessingPath, PathNode],
) -> None:
"""Updates the PathNodeMap instance with new key-value pairs.
Args:
*args (Union[PathNodeMap,dict[ProcessingPath, PathNode],dict[str, PathNode]]): PathNodeMap or dictionary containing the key-value pairs to append to the PathNodeMap
overwrite (bool): Flag indicating whether to overwrite existing values if the key already exists.
*kwargs (PathNode): Path Nodes using the path as the argument name to append to the PathNodeMap
Returns
"""
logger.debug("Updating PathNodeMap instance") # Log updating
node_dict = self._format_nodes_as_dict(*args, **kwargs)
self._update(node_dict, overwrite)
logger.debug("Updated successfully")
def _update(
self,
node_dict: Union[
PathNodeMap,
dict[ProcessingPath, PathNode],
dict[ProcessingPath, PathNode],
],
overwrite: Optional[bool] = None,
) -> None:
"""Helper method for directly updating the current path node map skipping previously performed validation
steps."""
default_overwrite = overwrite if overwrite is not None else self.overwrite
try:
# setting and using self.overwrite as a temporary overwrite parameter
self.overwrite = default_overwrite
super().update(node_dict)
except Exception as e:
raise PathNodeMapError(f"An error occurred during updating: {e}") from e
finally:
self.overwrite = default_overwrite
[docs]
def get( # type: ignore[override]
self, key: Union[str, ProcessingPath], default: Optional[PathNode] = None
) -> Optional[PathNode]:
"""Gets an item from the PathNodeMap instance. If the value isn't available, this method will return the value
specified in default.
Args:
key (Union[str,ProcessingPath]): The key (Processing path) If string, coerces to a ProcessingPath.
Returns:
PathNode: The value (PathNode instance).
"""
if key is None:
return None
if isinstance(key, PathNode):
key = key.path
key = self._validate_path(key)
return super().get(key)
@property
def record_indices(self) -> list[int]:
"""
Helper property for retrieving the full list of all record indices across all paths for the current map
Note: This assumes that all paths within the current map are derived from a list of records where every
path's first element denotes its initial position in a list with nested json components
Returns:
list[int]: A list containing integers denoting individual records found in each path
"""
return sorted({path.record_index for path in self.nodes})
[docs]
def get_node(self, key: Union[str, ProcessingPath], default: Optional[PathNode] = None) -> Optional[PathNode]:
"""Helper method for retrieving a path node in a standardized way."""
return self.get(key, default)
def __getitem__(self, key: Union[str, ProcessingPath]) -> PathNode:
"""Gets an item from the PathNodeMap instance.
Args:
key (Union[str,ProcessingPath]): The key (Processing path) If string, coerces to a ProcessingPath.
Returns:
PathNode: The value (PathNode instance).
"""
key = self._validate_path(key)
return self.data[key]
@staticmethod
def _validate_path(key: object) -> ProcessingPath:
"""
For coercing strings into processing paths if the object is not already a path
Args:
key (Union[str,ProcessingPath]): A path object in string/ProcessingPath
for retrieving, searching, deleting objects, etc.
Returns:
ProcessingPath: Returns the path as is if a ProcessingPath. otherwise
this method coerces string inputs into a ProcessingPath
Raises:
InvalidProcessingPathError if the value is anything other than a string/path object already
"""
if isinstance(key, str):
key = ProcessingPath.with_inferred_delimiter(key)
if not isinstance(key, ProcessingPath):
raise InvalidProcessingPathError(
f"Unexpected value type observed: {type(key)}): Expected a ProcessingPath/string"
)
return key
[docs]
def add(self, node: PathNode, overwrite: Optional[bool] = None, inplace: bool = True) -> Optional[PathNodeMap]:
"""Add a node to the PathNodeMap instance.
Args:
node (PathNode): The node to add.
overwrite (bool): Flag indicating whether to overwrite existing values if the key already exists.
Raises:
PathNodeMapError: If any error occurs while adding the node.
"""
default_overwrite = self.overwrite
try:
if not inplace:
path_node_map = copy.deepcopy(self)
path_node_map.add(node, overwrite=overwrite, inplace=True)
return path_node_map
if PathNode.is_valid_node(node):
logger.debug(f"Adding node: '{node}'")
self.__setitem__(
node.path,
node,
# overwrite=overwrite
)
except Exception as e:
raise PathNodeMapError(f"Error adding nodes to PathNodeMap: {e}") from e
finally:
self.overwrite = default_overwrite
return None
[docs]
def remove(self, node: Union[ProcessingPath, PathNode, str], inplace: bool = True) -> Optional[PathNodeMap]:
"""
Remove the specified path or node from the PathNodeMap instance.
Args:
node (Union[ProcessingPath, PathNode, str]): The path or node to remove.
inplace (bool): Whether to remove the path in-place or return a new PathNodeMap instance. Default is True.
Returns:
Optional[PathNodeMap]: A new PathNodeMap instance with the specified paths removed if inplace is specified as True.
Raises:
PathNodeMapError: If any error occurs while removing.
"""
try:
if not inplace:
path_node_map = copy.deepcopy(self)
path_node_map.remove(node, inplace=False)
return path_node_map
if not isinstance(node, (str, ProcessingPath, PathNode)):
raise PathNodeMapError(f"Invalid type for node: {type(node)}. Must be a ProcessingPath or a PathNode.")
path = node.path if isinstance(node, PathNode) else ProcessingPath.to_processing_path(node)
logger.debug(f"Removing node: '{node}'")
del self.data[path]
except Exception as e:
raise PathNodeMapError(f"Error removing paths from PathNodeMap: {e}") from e
return None
def __copy__(self) -> PathNodeMap:
"""Create a copy of the current path-node combinations and their contents.
Returns:
SparsePathNodeMap: A new map of path-node combinations with the same attributes
and values as the current map.
"""
try:
path_node_map = self.__class__.__new__(self.__class__)
path_node_map.__dict__ = self.__dict__.copy()
return path_node_map
except Exception as e:
logger.exception(f'Error copying map "{self}": {e}')
raise PathNodeMapError(f"Error copying map: {e}")
__all__ = ["PathNodeMap"]