# /utils/json_processing_utils.py
"""Helper module used to process recursive JSON data received from APIs of an unknown type and structure.
Classes:
PathUtils:
Utility class used to prepare path strings and lists of path components consistently for processing.
KeyDiscoverer:
Helper class for identifying JSON paths and terminal keys containing nested data elements.
KeyFilter:
Helper class used to identify and filter nested dictionaries based on path length and pattern matching.
RecursiveJsonProcessor:
Front-end facing utility function used by the `scholar_flux.data.RecursiveDataProcessor` to process, filter,
and flatten JSON formatted data.
JsonRecordData:
Helper class used as a container to hold extracted path/data components for further processing.
JsonNormalizer:
Helper class used by the `RecursiveJsonProcessor` to flatten the inputted JSON record into a
non-nested dictionary
Example Use:
>>> from scholar_flux.utils import RecursiveJsonProcessor
>>> from pprint import pp
>>> data = {
"authors": {"principle_investigator": "Dr. Smith", "assistant": "Jane Doe"},
"doi": "10.1234/example.doi",
"title": "Sample Study",
"abstract": ["This is a sample abstract.", "keywords: 'sample', 'abstract'"],
"genre": {"subspecialty": "Neuroscience"},
"journal": {"topic": "Sleep Research"},
}
# joins fields with nested components using a newline character - retains full paths leading to each value
>>> processor = RecursiveJsonProcessor(object_delimiter = ' ', use_full_path = True)
# processes and flattens the JSON dict using the defined helper classes under the hood
>>> result = processor.process_and_flatten(data)
# prints the result in a format that is easier to view from the CLI
>>> pp(result)
# OUTPUT: {'authors.principle_investigator': 'Dr. Smith',
'authors.assistant': 'Jane Doe',
'doi': '10.1234/example.doi',
'title': 'Sample Study',
'abstract': "This is a sample abstract. keywords: 'sample', 'abstract'",
'genre.subspecialty': 'Neuroscience',
'journal.topic': 'Sleep Research'}
"""
from typing import Dict, List, Tuple, Any, Optional
from itertools import chain
from dataclasses import dataclass
import re
from collections import defaultdict
import logging
logger = logging.getLogger(__name__)
[docs]
class PathUtils:
"""Helper class used to perform string/list manipulations for paths that can be represented in either form,
requiring conversion from one type to the other in specific JSON path processing scenarios."""
[docs]
@staticmethod
def path_name(level_names: List[Any]) -> str:
"""Generate a string representation of the path based on the provided level names.
Args:
level_names (List[Any]): A list of names representing the path levels.
Returns:
str: A string representation of the path.
"""
logger.debug(f"Generating path name for levels: {level_names}")
if not level_names:
return ""
for name in reversed(level_names):
if not isinstance(name, int):
logger.debug(f"Found non-integer name: {name}")
return str(name)
path_str = PathUtils.path_str(level_names)
return path_str
[docs]
@staticmethod
def path_str(level_names: List[Any]) -> str:
"""Join the level names into a single string separated by underscores.
Args:
level_names (List[Any]): A list of names representing the path levels.
Returns:
str: A single string with level names joined by underscores.
"""
path_str = ".".join(map(str, level_names))
return path_str
[docs]
@staticmethod
def remove_path_indices(path: List[Any]) -> List[Any]:
"""Remove integer indices from the path to get a list of key names.
Args:
path (List[Any]): The original path containing both keys and indices.
Returns:
List[Any]: A path with only the key names.
"""
key_path = [k for k in path if not isinstance(k, int) and k not in ("value",)]
return key_path
[docs]
@staticmethod
def constant_path_indices(path: List[Any], constant: str = "i") -> List[Any]:
"""Replace integer indices with constants in the provided path.
Args:
path (List[Any]): The original path containing both keys and indices.
constant (str): A value to replace a numeric value with.
Returns:
List[Any]: A path with only the key names.
"""
constant = constant or "i"
key_path = [constant if isinstance(k, int) else k for k in path]
return key_path
[docs]
@staticmethod
def group_path_assignments(path: List[Any]) -> Optional[str]:
"""Group the path assignments into a single string, excluding indices.
Args:
path (List[Any]): The original path containing both keys and indices.
Returns:
Optional[str]: A single string representing the grouped path, or None if the path is empty.
"""
key_path = PathUtils.constant_path_indices(path)
if key_path:
grouped_path = PathUtils.path_str(key_path)
return grouped_path
logger.debug("No valid keys found in path")
return None
[docs]
class KeyFilter:
"""Helper class used to create a simple filter that allows for the identification of terminal keys associated with
data in a JSON structure and the paths that lead to each terminal key."""
[docs]
@staticmethod
def filter_keys(
discovered_keys: Dict[str, List[str]],
prefix: Optional[str] = None,
min_length: Optional[int] = None,
substring: Optional[str] = None,
pattern: Optional[str] = None,
include_matches: bool = True,
match_any: bool = True,
) -> Dict[str, List[str]]:
"""A method used to create a function that matches key-value pairs based on the specified criteria.
For example, filtering can be configured to identify keys based on prefix, minimum path length, and path
substring/pattern matching with conditional match inclusion/exclusion.
"""
def matches_criteria(key: str, paths: List[str]) -> bool:
"""Helper function that, when configured via `filter_keys` allows for the identification of keys and paths
that match a specific criteria."""
matches = []
if prefix:
matches.append(key.startswith(prefix))
if min_length is not None:
matches.append(any(len(path.split(".")) >= min_length for path in paths))
if substring:
matches.append(any(substring in path for path in paths))
if pattern:
regex_pattern = re.compile(pattern)
matches.append(any(regex_pattern.fullmatch(node) for path in paths for node in path.split(".")))
fn = any if match_any else all
return fn(matches)
return {key: paths for key, paths in discovered_keys.items() if matches_criteria(key, paths) is include_matches}
[docs]
class KeyDiscoverer:
"""Helper class used to discover terminal keys containing data within nested JSON data structures and identify the
paths used to arrive at each key.
Attributes:
_discovered_keys (dict[str, list]): Defines the complete list of all keys that can be found in a dictionary
and the path that needs to be traversed to arrive at that key
_terminal_paths (dict[str, bool]): Creates a dictionary that indicates whether the currently added path is
terminal within the JSON data structure
"""
[docs]
def __init__(self, records: Optional[List[Dict]] = None):
"""Initializes the KeyDiscoverer and identifies terminal key/path pairs within the JSON data structure."""
self.records = records or []
self._discovered_keys, self._terminal_paths = self._discover_keys()
def _discover_keys(self) -> Tuple[Dict[str, List[str]], Dict[str, bool]]:
"""Discovers all keys within the provided records recursively."""
discovered_keys: dict[str, list] = defaultdict(list)
terminal_paths: dict[str, bool] = {}
for record in self.records:
self._discover_keys_recursive(record, discovered_keys, terminal_paths, [])
return discovered_keys, terminal_paths
def _is_terminal(self, value: Any) -> bool:
"""Determines if the given value is terminal (not a nested dictionary or a list with nested dictionaries)."""
if isinstance(value, dict):
return False
if isinstance(value, list):
# Check recursively if there are any dictionaries within the list
return all(self._is_terminal(item) for item in value)
return True
def _discover_keys_recursive(
self,
record: Any,
discovered_keys: Dict[str, List[str]],
terminal_paths: Dict[str, bool],
current_path: List[str],
) -> None:
"""Recursively traverses records to discover keys, their paths, and terminal status."""
if isinstance(record, dict):
for key, value in record.items():
new_path = current_path + [key]
path_str = PathUtils.path_str(new_path)
if path_str not in discovered_keys[key]:
discovered_keys[key].append(path_str)
self._discover_keys_recursive(value, discovered_keys, terminal_paths, new_path)
terminal_paths[path_str] = self._is_terminal(value)
elif isinstance(record, list):
for index, item in enumerate(record):
new_path = current_path + [str(index)]
self._discover_keys_recursive(item, discovered_keys, terminal_paths, new_path)
[docs]
def get_all_keys(self) -> Dict[str, List[str]]:
"""Returns all discovered keys and their paths."""
return self._discovered_keys
[docs]
def get_terminal_keys(self) -> Dict[str, List[str]]:
"""Returns keys and their terminal paths (paths that don't contain nested dictionaries)."""
terminal_keys = defaultdict(list)
for path, is_terminal in self._terminal_paths.items():
if is_terminal:
key = path.split(".")[-1]
terminal_keys[key].append(path)
return terminal_keys
[docs]
def get_terminal_paths(self) -> List[str]:
"""Returns paths indicating whether they are terminal (don't contain nested dictionaries)."""
return [path for (path, is_terminal) in self._terminal_paths.items() if is_terminal]
[docs]
def get_keys_with_path(self, key: str) -> List[str]:
"""Returns all paths associated with a specific key."""
return self._discovered_keys.get(key, [])
[docs]
def filter_keys(
self,
prefix: Optional[str] = None,
min_length: Optional[int] = None,
substring: Optional[str] = None,
) -> Dict[str, List[str]]:
"""Helper method that filters a range of keys based on the specified criteria."""
return KeyFilter.filter_keys(self._discovered_keys, prefix, min_length, substring)
def __repr__(self) -> str:
"""Helper method for displaying a human-readable representation of the KeyDiscoverer."""
class_name = self.__class__.__name__
return (
f"{class_name}(records=len({len(self.records)}), "
f"_discovered_keys=len({len(self._discovered_keys)}), "
f"_terminal_paths=len({len(self._terminal_paths)}))"
)
[docs]
@dataclass
class JsonRecordData:
"""Helper class used as a container to record the paths, data, and names associated with each terminal path.
Args:
path (list[str | int]): The path associated with the terminal data point where nested terminal values can be found
data (dict[str, Any]): The nested terminal value at the end of a path
"""
path: List[str | int]
data: Dict[str, Any]
[docs]
class RecursiveJsonProcessor:
"""An implementation of a recursive JSON dictionary processor that is used to process and identify nested components
such as paths, terminal key names, and the data at each terminal path.
This utility of the RecursiveJsonProcessor is for flattening dictionary records into flattened representations where
its keys represent the terminal paths at each node and its values represent the data found at each terminal path.
"""
[docs]
def __init__(
self,
json_dict: Optional[Dict] = None,
object_delimiter: Optional[str] = "; ",
normalizing_delimiter: Optional[str] = None,
use_full_path: Optional[bool] = False,
):
"""Initialize the RecursiveJsonProcessor with a JSON dictionary and a delimiter for joining list elements.
Args:
json_dict (Dict): The input JSON dictionary to be parsed.
object_delimiter (str): The delimiter used to join elements max depth list objects. Default is "; ".
normalizing_delimiter (str): The delimiter used to join elements across multiple keys when normalizing. Default is "\n\n".
"""
self.json_dict = json_dict
self.normalizing_delimiter = normalizing_delimiter
self.object_delimiter = object_delimiter
self.key_discoverer = (
KeyDiscoverer([json_dict] if not isinstance(json_dict, list) else json_dict) if json_dict else None
)
self.use_full_path = use_full_path or False
self.extracted_record_data_list: list[JsonRecordData] = []
[docs]
def combine_normalized(self, normalized_field_value: Optional[list | str]) -> list | str | None:
"""Combines lists of nested data (strings, ints, None, etc.) into a single string separated by the
normalizing_delimiter.
If a delimiter isn't specified or if the value is None, it is returned as is without modification.
"""
if isinstance(normalized_field_value, str):
return normalized_field_value
if self.normalizing_delimiter is not None and isinstance(normalized_field_value, list):
return (
self.normalizing_delimiter.join([f"{value}" for value in normalized_field_value if value is not None])
or None
)
return self.unlist(normalized_field_value)
[docs]
@staticmethod
def unlist(current_data: Optional[Dict | List]) -> Optional[Any]:
"""Flattens a dictionary or list if it contains a single element that is a dictionary.
Args:
current_data: A dictionary or list to be flattened if it contains a single dictionary element.
Returns:
Optional[Dict|List]: The flattened dictionary if the input meets the flattening condition, otherwise returns the input unchanged.
"""
if isinstance(current_data, list) and len(current_data) == 1:
return current_data[0]
return current_data
[docs]
def process_dictionary(self, obj: Optional[Dict] = None):
"""Create a new json dictionary that contains information about the relative paths of each field that can be
found within the current json_dict."""
self.json_dict = obj or self.json_dict
if not self.json_dict:
raise ValueError("Json Dictionary not specified")
self.extracted_record_data_list.clear()
self.process_level(self.json_dict)
return self
[docs]
def process_level(self, obj: Any, level_name: Optional[List[Any]] = None) -> List[Any]:
"""Helper method for processing a level within a dictionary.
This method is recursively called to process nested components
"""
level_name = level_name if level_name is not None else []
if isinstance(obj, list):
if any(isinstance(v_i, (list, dict)) for v_i in obj):
return list(chain.from_iterable(self.process_level(v_i, level_name + [i]) for i, v_i in enumerate(obj)))
joined_obj = self.object_delimiter.join(map(str, obj)) if self.object_delimiter is not None else tuple(obj)
return self.process_level(joined_obj, level_name)
elif isinstance(obj, dict):
return list(chain.from_iterable(self.process_level(v, level_name + [k]) for k, v in obj.items()))
else:
obj = list(obj) if isinstance(obj, tuple) else obj
obj_info = JsonRecordData(data=obj, path=level_name)
self.extracted_record_data_list.append(obj_info)
return [obj_info]
[docs]
def flatten(self) -> Optional[Dict[str, List[Any] | str | None]]:
"""Flatten the extracted JSON dictionary from a nested structure into a simpler structure.
Returns:
Optional[Dict[str, List[Any]]]: A dictionary with flattened paths as keys and lists of values.
"""
if self.extracted_record_data_list:
normalizer = JsonNormalizer(
self.extracted_record_data_list,
use_full_path=self.use_full_path,
)
normalized_json = normalizer.normalize_extracted()
normalized_json = {
data_key: self.combine_normalized(field_value) for data_key, field_value in normalized_json.items()
}
return normalized_json
logger.debug("No data extracted, returning None")
return None
[docs]
def process_and_flatten(
self, obj: Optional[Dict] = None, exclude_keys: Optional[List[str]] = None
) -> Optional[Dict[str, List[Any] | str | None]]:
"""Process the dictionary, filter extracted paths, and then flatten the result.
Args:
exclude_keys (Optional[List[str]]): List of keys to exclude from the flattened result.
Returns:
Optional[Dict[str, List[Any]]]: A dictionary with flattened paths as keys and lists of values.
"""
self.process_dictionary(obj)
if exclude_keys:
self.filter_extracted(exclude_keys)
return self.flatten()
def __repr__(self) -> str:
"""Helper method for displaying a human-readable representation of the RecursiveJsonProcessor."""
class_name = self.__class__.__name__
return f"{class_name}(object_delimiter={self.object_delimiter}, simplifier={self.normalizing_delimiter}, use_full_path={self.use_full_path})"
[docs]
class JsonNormalizer:
"""Helper class that flattens and normalizes the retrieved list of JsonRecordData into singular flattened
dictionary."""
[docs]
def __init__(
self,
json_record_data_list: List[JsonRecordData],
use_full_path: bool = False,
):
"""Initialize the JsonNormalizer with extracted JSON data and a delimiter.
Args:
extracted_record_data_list (List[JsonRecordData]): The list of extracted JSON data.
delimiter (str): The delimiter used to join elements in lists.
use_full_path (str): Indicates whether to use the full nested json path or the smallest unique path available
"""
self.json_record_data_list = json_record_data_list
self.use_full_path = use_full_path or False
[docs]
def get_unique_key(
self,
current_key_str: str,
current_group: List[str],
unique_mappings_dict: Dict[str, List[str]],
) -> str:
"""Generate a unique key for the current data entry.
Args:
current_key_str (str): The string representation of the current path.
current_group (List[str]): The list of keys in the current path.
unique_mappings_dict (Dict[str, List[str]]): A dictionary tracking unique keys.
Returns:
str: A unique key for the current data entry.
"""
logger.debug(f"Generating unique key for: {current_key_str}")
found_key = next(
(data_key for data_key, key_str in unique_mappings_dict.items() if current_key_str in key_str),
None,
)
if found_key:
return found_key
return self.create_unique_key(current_group, current_key_str, unique_mappings_dict)
[docs]
def create_unique_key(
self,
current_group: List[str],
current_key_str: str,
unique_mappings_dict: Dict[str, List[str]],
) -> str:
"""Create a unique key for the current data entry if a simple key is not sufficient.
Args:
current_group (List[str]): The list of keys in the current path.
current_key_str (str): The string representation of the current path.
unique_mappings_dict (Dict[str, List[str]]): A dictionary tracking unique keys.
Returns:
str: A unique key for the current data entry.
"""
idx = 1 if not self.use_full_path else len(current_group)
while idx <= len(current_group):
current_data_key_test = ".".join(current_group[-idx:])
if current_data_key_test not in unique_mappings_dict:
unique_mappings_dict[current_data_key_test].append(current_key_str)
logger.debug(f"Created unique key: {current_key_str} => {current_data_key_test}")
return current_data_key_test
idx += 1
idx = 1
base_key = current_group[-1]
current_data_key_test = f"{base_key}.{idx}"
while current_data_key_test in unique_mappings_dict:
idx += 1
current_data_key_test = f"{base_key}.{idx}"
unique_mappings_dict[current_data_key_test].append(current_key_str)
logger.debug(f"Created unique key: {current_key_str} => {current_data_key_test}")
return current_data_key_test
__all__ = [
"PathUtils",
"KeyDiscoverer",
"KeyFilter",
"RecursiveJsonProcessor",
"JsonRecordData",
"JsonNormalizer",
]