Source code for scholar_flux.data.path_data_processor

# /data/path_data_processor.py
"""The scholar_flux.data.recursive_data_processor implements the PathDataProcessor that uses a custom path processing
implementation to dynamically flatten and format JSON records to retrieve nested-key value pairs.

Similar to the RecursiveDataProcessor, the PathDataProcessor can be used to dynamically filter, process, and flatten
nested paths while formatting the output based on its specification.

"""

from typing import Any, Optional, Union
from scholar_flux.utils import PathNodeIndex, ProcessingPath, PathDiscoverer, as_list_1d, generate_repr
from scholar_flux.data.abc_processor import ABCDataProcessor
from scholar_flux.exceptions import DataProcessingException, DataValidationException
import threading

import re
import logging


logger = logging.getLogger(__name__)


[docs] class PathDataProcessor(ABCDataProcessor): """The PathDataProcessor uses a custom implementation of Trie-based processing to abstract nested key-value combinations into path-node pairs where the path defines the full range of nested keys that need to be traversed to arrive at each terminal field within each individual record. This implementation automatically and dynamically flattens and filters a single page of records (a list of dictionary-based records) extracted from a response at a time to return the processed record data. Example: >>> from scholar_flux.data import PathDataProcessor >>> path_data_processor = PathDataProcessor() # instantiating the class >>> data = [{'id':1, 'a':{'b':'c'}}, {'id':2, 'b':{'f':'e'}}, {'id':2, 'c':{'h':'g'}}] ### The process_page method can then be referenced using the processor as a callable: >>> result = path_data_processor(data) # recursively flattens and processes by default >>> print(result) # OUTPUT: [{'id': '1', 'a.b': 'c'}, {'id': '2', 'b.f': 'e'}, {'id': '2', 'c.h': 'g'}] """
[docs] def __init__( self, json_data: Union[dict, Optional[list[dict]]] = None, value_delimiter: Optional[str] = "; ", ignore_keys: Optional[list] = None, keep_keys: Optional[list[str]] = None, regex: Optional[bool] = True, use_cache: Optional[bool] = True, ) -> None: """Initializes the data processor with JSON data and optional parameters for processing.""" super().__init__() self._validate_inputs(ignore_keys, keep_keys, regex, value_delimiter=value_delimiter) self.value_delimiter = value_delimiter self.regex = regex self.ignore_keys = ignore_keys or None self.keep_keys = keep_keys or None self.use_cache = use_cache or False self.path_node_index = PathNodeIndex(use_cache=self.use_cache) self.json_data = json_data self.lock = threading.Lock() self.load_data(json_data)
@property def cached(self) -> bool: """Property indicating whether the underlying path node index uses a cache of weakreferences to nodes.""" return self.path_node_index.node_map.use_cache
[docs] def load_data(self, json_data: Optional[dict | list[dict]] = None) -> bool: """Attempts to load a data dictionary or list, contingent of it having at least one non-missing record to load from. If `json_data` is missing or the json input is equal to the current `json_data` attribute, then the `json_data` attribute will not be updated from the json input. Args: json_data (Optional[dict | list[dict]]) The json data to be loaded as an attribute Returns: bool: Indicates whether the data was successfully loaded (True) or not (False) """ if not json_data and not self.json_data: return False try: if json_data and json_data != self.json_data: logger.debug("Updating JSON data") self.json_data = json_data logger.debug("Discovering paths") discovered_paths = PathDiscoverer(self.json_data).discover_path_elements(inplace=False) logger.debug("Creating a node index") self.path_node_index = PathNodeIndex.from_path_mappings( discovered_paths or {}, chain_map=True, use_cache=self.use_cache ) logger.debug("JSON data loaded") return True except DataValidationException as e: raise DataValidationException( f"The JSON data of type {type(self.json_data)} could not be successfully " f"processed and loaded into an index: {e}" )
[docs] def process_record( self, record_index: int, keep_keys: Optional[list] = None, ignore_keys: Optional[list] = None, regex=None, ) -> None: """Processes a record dictionary to extract record data and article content, creating a processed record dictionary with an abstract field. Determines whether or not to retain a specific record at the index. """ logger.debug("Processing next record...") record_idx_prefix = ProcessingPath(str(record_index)) indexed_nodes = self.path_node_index.node_map.filter(record_idx_prefix) if not indexed_nodes: logger.warning(f"A record is not associated with the following index: {record_index}") return None if any( [ (keep_keys and not self.record_filter(indexed_nodes, keep_keys, regex=regex)), self.record_filter(indexed_nodes, ignore_keys, regex=regex), ] ): for path in indexed_nodes: self.path_node_index.node_map.remove(path) return None
[docs] def process_page( self, parsed_records: Optional[list[dict]] = None, keep_keys: Optional[list[str]] = None, ignore_keys: Optional[list[str]] = None, combine_keys: bool = True, regex: Optional[bool] = None, ) -> list[dict]: """Processes each individual record dict from the JSON data.""" self._validate_inputs(ignore_keys, keep_keys, regex, value_delimiter=self.value_delimiter) try: if parsed_records is not None: logger.debug("Processing next page..") self.load_data(parsed_records) elif self.json_data: logger.debug("Processing existing page..") else: raise ValueError("JSON Data has not been loaded successfully") if self.path_node_index is None: raise ValueError("JSON data could not be loaded into the processing path index successfully") keep_keys = keep_keys or self.keep_keys ignore_keys = ignore_keys or self.ignore_keys for record_index in self.path_node_index.record_indices: self.process_record( record_index, keep_keys=keep_keys, ignore_keys=ignore_keys, regex=regex, ) if combine_keys: self.path_node_index.combine_keys() # Process each record in the JSON data processed_data = self.path_node_index.simplify_to_rows(object_delimiter=self.value_delimiter) return processed_data except DataProcessingException as e: raise DataProcessingException(f"An error occurred during th data processing: {e}")
[docs] def record_filter( self, record_dict: dict[ProcessingPath, Any], record_keys: Optional[list[str]] = None, regex: Optional[bool] = None, ) -> bool: """Indicates whether a record contains a path (key) indicating whether the record as a whole should be retained or dropped.""" if not record_keys: return False regex = regex if regex is not None else self.regex use_regex = regex if regex is not None else False record_pattern = "|".join(record_keys if use_regex else map(re.escape, as_list_1d(record_keys))) contains_record_pattern = ( any(re.search(record_pattern, path.to_string()) for path in record_dict) if record_pattern else None ) return bool(contains_record_pattern)
[docs] def discover_keys(self) -> Optional[dict[str, Any]]: """Discovers all keys within the JSON data.""" return {str(node.path): node for node in self.path_node_index.nodes}
[docs] def structure(self, flatten: bool = False, show_value_attributes: bool = False) -> str: """Method for showing the structure of the current PathDataProcessor and identifying the current configuration. Useful for showing the options being used to process the api response records """ return generate_repr( self, flatten=flatten, show_value_attributes=show_value_attributes, exclude={"json_data", "use_cache"} )
def __call__(self, *args, **kwargs) -> list[dict]: """Convenience method that calls process_page while also locking the class for processing while a single page is processed. Useful in a threading context where multiple SearchCoordinators may be using the same PathDataProcessor. """ with self.lock: return self.process_page(*args, **kwargs)
__all__ = ["PathDataProcessor"]