Source code for scholar_flux.data.path_data_processor

# /data/path_data_processor.py
"""The scholar_flux.data.path_data_processor implements the PathDataProcessor that uses a custom path processing
implementation to dynamically flatten and format JSON records to retrieve nested-key value pairs.

Similar to the RecursiveDataProcessor, the PathDataProcessor can be used to dynamically filter, process, and flatten
nested paths while formatting the output based on its specification.

"""

from typing import Any, Optional
from scholar_flux.utils import PathNodeIndex, ProcessingPath, PathDiscoverer, as_list_1d, generate_repr
from scholar_flux.data.abc_processor import ABCDataProcessor
from scholar_flux.exceptions import DataProcessingException, DataValidationException
from scholar_flux.utils.record_types import RecordType, RecordList
import threading

import re
import logging


logger = logging.getLogger(__name__)


[docs] class PathDataProcessor(ABCDataProcessor): """The PathDataProcessor uses a custom implementation of Trie-based processing to abstract nested key-value combinations into path-node pairs where the path defines the full range of nested keys that need to be traversed to arrive at each terminal field within each individual record. This implementation automatically and dynamically flattens and filters a single page of records (a list of dictionary-based records) extracted from a response at a time to return the processed record data. Example: >>> from scholar_flux.data import PathDataProcessor >>> path_data_processor = PathDataProcessor() # instantiating the class >>> data = [{'id':1, 'a':{'b':'c'}}, {'id':2, 'b':{'f':'e'}}, {'id':2, 'c':{'h':'g'}}] ### The process_page method can then be referenced using the processor as a callable: >>> result = path_data_processor(data) # recursively flattens and processes by default >>> print(result) # OUTPUT: [{'id': '1', 'a.b': 'c'}, {'id': '2', 'b.f': 'e'}, {'id': '2', 'c.h': 'g'}] """
[docs] def __init__( self, json_data: Optional[RecordType | RecordList] = None, value_delimiter: Optional[str] = None, ignore_keys: Optional[list] = None, keep_keys: Optional[list[str]] = None, regex: Optional[bool] = True, use_cache: Optional[bool] = True, ) -> None: """Initializes the data processor with JSON data and optional parameters for processing.""" super().__init__() self._validate_inputs(ignore_keys, keep_keys, regex, value_delimiter=value_delimiter) self.value_delimiter = value_delimiter self.regex = regex self.ignore_keys = ignore_keys or None self.keep_keys = keep_keys or None self.use_cache = use_cache or False self.path_node_index = PathNodeIndex(use_cache=self.use_cache) self.json_data = None self.lock = threading.Lock() self.load_data(json_data)
@property def cached(self) -> bool: """Property indicating whether the underlying path node index uses a cache of weakreferences to nodes.""" return self.path_node_index.node_map.use_cache @property def json_data(self) -> Optional[RecordList]: """A list of dictionary-based records to further process.""" return self._json_data @json_data.setter def json_data(self, data: Optional[RecordType | RecordList]) -> None: """A list of dictionary-based records to further process.""" if isinstance(data, dict): data = [data] self._validate_json_data(data) self._json_data = data
[docs] def load_data(self, json_data: Optional[RecordType | RecordList] = None) -> bool: """Attempts to load a data dictionary or list, contingent on the input having at least one non-missing record. If `json_data` is missing or the json input is equal to the current `json_data` attribute, then the `json_data` attribute will not be updated from the json input. Args: json_data (Optional[RecordType | RecordList]): The json data to be loaded as an attribute. Returns: bool: Indicates whether the data was successfully loaded (True) or not (False). """ if json_data is None and self.json_data is None: return False try: if json_data is not None and json_data != self.json_data: if self.json_data is not None: logger.debug("Updating JSON data...") self.json_data = json_data logger.debug("Discovering paths...") discovered_paths = PathDiscoverer(self.json_data).discover_path_elements(inplace=False) logger.debug("Creating a node index...") self.path_node_index = ( PathNodeIndex.from_path_mappings(discovered_paths or {}, chain_map=True, use_cache=self.use_cache) if self.json_data else PathNodeIndex() ) logger.debug("JSON data loaded.") return True except DataValidationException as e: raise DataValidationException( f"The JSON data could not be successfully loaded and processed into an index: {e}" )
[docs] def process_record( self, record_index: int, keep_keys: Optional[list] = None, ignore_keys: Optional[list] = None, regex: Optional[bool] = None, ) -> None: """Processes the current record dictionary, indicating if the record at the index should be retained or dropped. The full set of processed records is subsequently accessible via `processor.path_node_index.simplify_to_rows()`. """ logger.debug("Processing next record...") record_idx_prefix = ProcessingPath(str(record_index)) indexed_nodes = self.path_node_index.node_map.filter(record_idx_prefix) if not indexed_nodes: logger.warning(f"A record is not associated with the following index: {record_index}") return if any( [ (keep_keys and not self.record_filter(indexed_nodes, keep_keys, regex=regex)), self.record_filter(indexed_nodes, ignore_keys, regex=regex), ] ): for path in indexed_nodes: self.path_node_index.node_map.remove(path) return
[docs] def process_page( self, parsed_records: Optional[RecordType | RecordList] = None, keep_keys: Optional[list[str]] = None, ignore_keys: Optional[list[str]] = None, combine_keys: bool = True, regex: Optional[bool] = None, ) -> RecordList: """Processes each individual record dict from the JSON data.""" self._validate_inputs(ignore_keys, keep_keys, regex, value_delimiter=self.value_delimiter) try: if parsed_records is not None: logger.debug("Processing next page..") self.load_data(parsed_records) elif self.json_data: logger.debug("Processing existing page..") if self.json_data is None: raise DataValidationException( "JSON data could not be successfully loaded into the JSON processing index." ) keep_keys = keep_keys or self.keep_keys ignore_keys = ignore_keys or self.ignore_keys regex = regex if regex is not None else self.regex for record_index in self.path_node_index.record_indices: self.process_record( record_index, keep_keys=keep_keys, ignore_keys=ignore_keys, regex=regex, ) if combine_keys: self.path_node_index.combine_keys() # Process each record in the JSON data processed_data = self.path_node_index.simplify_to_rows(object_delimiter=self.value_delimiter) return processed_data except DataProcessingException: raise except Exception as e: raise DataProcessingException(f"An unexpected error occurred during data processing: {e}")
[docs] @classmethod def record_filter( cls, record_dict: dict[ProcessingPath, Any], record_keys: Optional[list[str]] = None, regex: Optional[bool] = None, ) -> bool: """Identifies whether a record contains a path (key), indicating whether the record should be retained.""" if not record_keys: return False use_regex = regex if regex is not None else False record_pattern = "|".join(record_keys if use_regex else map(re.escape, as_list_1d(record_keys))) contains_record_pattern = ( any(re.search(record_pattern, path.to_string()) for path in record_dict) if record_pattern else None ) return bool(contains_record_pattern)
[docs] def discover_keys(self) -> Optional[dict[str, Any]]: """Discovers all keys within the JSON data.""" return {str(node.path): node for node in self.path_node_index.nodes}
[docs] def structure(self, flatten: bool = False, show_value_attributes: bool = False) -> str: """Method for showing the structure of the current PathDataProcessor and identifying the current configuration. Useful for showing the options being used to process the api response records """ return generate_repr( self, flatten=flatten, show_value_attributes=show_value_attributes, exclude={"_json_data", "use_cache"} )
def __call__(self, *args: Any, **kwargs: Any) -> list[dict]: """Convenience method that applies a thread lock and processes a single page of JSON data via `.process_page()`. Useful in a threading context where multiple SearchCoordinators may be using the same PathDataProcessor. """ with self.lock: return self.process_page(*args, **kwargs)
__all__ = ["PathDataProcessor"]