Source code for scholar_flux.data.recursive_data_processor

# /data/recursive_data_processor.py
"""The scholar_flux.data.recursive_data_processor implements the RecursiveDataProcessor that implements the dynamic, and
automatic recursive retrieval of nested key-data pairs from listed dictionary records.

The data processor can be used to flatten and filter records based on conditions and extract nested data for each record
in the response.

"""
from typing import Any, Optional
from scholar_flux.utils import KeyDiscoverer, RecursiveJsonProcessor, KeyFilter
from scholar_flux.utils import nested_key_exists
from scholar_flux.data.abc_processor import ABCDataProcessor

from scholar_flux.exceptions import DataProcessingException, DataValidationException
from scholar_flux.utils.record_types import RecordType, RecordList

import threading
import logging

logger = logging.getLogger(__name__)


[docs] class RecursiveDataProcessor(ABCDataProcessor): """Processes a list of raw page record dict data from the API response based on discovered record keys and flattens them into a list of dictionaries consisting of key value pairs that simplify the interpretation of the final flattened json structure. Example: >>> from scholar_flux.data import RecursiveDataProcessor >>> data = [{'id':1, 'a':{'b':'c'}}, {'id':2, 'b':{'f':'e'}}, {'id':2, 'c':{'h':'g'}}] # creating a basic processor >>> recursive_data_processor = RecursiveDataProcessor() # instantiating the class ### The process_page method can then be referenced using the processor as a callable: >>> result = recursive_data_processor(data) # recursively flattens and processes by default >>> print(result) # OUTPUT: [{'id': '1', 'b': 'c'}, {'id': '2', 'f': 'e'}, {'id': '2', 'h': 'g'}] # To identify the full nested location of record: >>> recursive_data_processor = RecursiveDataProcessor(use_full_path=True) # instantiating the class >>> result = recursive_data_processor(data) # recursively flattens and processes by default >>> print(result) # OUTPUT: [{'id': '1', 'a.b': 'c'}, {'id': '2', 'b.f': 'e'}, {'id': '2', 'c.h': 'g'}] """
[docs] def __init__( self, json_data: Optional[list[dict]] = None, value_delimiter: Optional[str] = None, ignore_keys: Optional[list[str]] = None, keep_keys: Optional[list[str]] = None, regex: Optional[bool] = True, use_full_path: Optional[bool] = True, ) -> None: """Initializes the data processor with JSON data and optional parameters for processing. Args: json_data (list[dict]): The json data set to process and flatten - a list of dictionaries is expected value_delimiter (Optional[str]): Indicates whether or not to join values found at terminal paths ignore_keys (Optional[list[str]]): Determines records that should be omitted based on whether each record contains a key or substring. (off by default) keep_keys (Optional[list[str]]): Indicates whether or not to keep a record if the key is present. (off by default) regex (Optional[bool]): Determines whether to use regex filtering for filtering records based on the presence or absence of specific keywords use_full_path (Optional[bool]): Determines whether or not to keep the full path for the json record key. If False, the path is shortened, keeping the last key or set of keys while preventing name collisions. """ super().__init__() self._validate_inputs(ignore_keys, keep_keys, regex) self.value_delimiter = value_delimiter self.ignore_keys = ignore_keys self.keep_keys = keep_keys self.regex = regex self.use_full_path = use_full_path self.key_discoverer: KeyDiscoverer = KeyDiscoverer([]) self.recursive_processor = RecursiveJsonProcessor( normalizing_delimiter=self.value_delimiter, object_delimiter=self.value_delimiter, use_full_path=use_full_path, ) self.json_data = None self.load_data(json_data) self.lock = threading.Lock()
@property def json_data(self) -> Optional[RecordList]: """A list of dictionary-based records to further process.""" return self._json_data @json_data.setter def json_data(self, data: Optional[RecordType | RecordList]) -> None: """A list of dictionary-based records to further process.""" if isinstance(data, dict): data = [data] self._validate_json_data(data) self._json_data = data
[docs] def load_data(self, json_data: Optional[RecordType | RecordList] = None) -> bool: """Attempts to load a data dictionary or list, contingent on the input having at least one non-missing record. If `json_data` is missing, or the json input is equal to the current `json_data` attribute, then the json_data attribute will not be updated from the json input. Args: json_data (Optional[RecordType | RecordList]): The json data to be loaded as an attribute. Returns: bool: Indicates whether the data was successfully loaded (True) or not (False). """ if json_data is None and self.json_data is None: return False try: if json_data is not None and json_data != self.json_data: if self.json_data is not None: logger.debug("Updating JSON data...") self.json_data = json_data self.key_discoverer = KeyDiscoverer(self.json_data) logger.debug("JSON data loaded.") return True except Exception as e: raise DataValidationException(f"The JSON data could not be successfully loaded and processed: {e}")
[docs] def discover_keys(self) -> Optional[dict[str, list[str]]]: """Discovers all keys within the JSON data.""" return self.key_discoverer.get_all_keys()
[docs] def process_record(self, record_dict: RecordType, **kwargs: Any) -> RecordType: """Processes and flattens record dictionary, extracting record data and article content in the process.""" # Retrieve a dict containing the fields for the current record if not record_dict: return {} return self.recursive_processor.process_and_flatten(obj=record_dict, **kwargs) or {}
[docs] def process_page( self, parsed_records: Optional[list[dict]] = None, keep_keys: Optional[list[str]] = None, ignore_keys: Optional[list[str]] = None, regex: Optional[bool] = None, ) -> list[dict]: """Processes each individual record dict from the JSON data.""" try: if parsed_records is not None: logger.debug("Processing next page...") self.load_data(parsed_records) elif self.json_data: logger.debug("Reprocessing last page...") if self.json_data is None: raise DataValidationException("A valid JSON dictionary could not be successfully loaded.") keep_keys = keep_keys or self.keep_keys ignore_keys = ignore_keys or self.ignore_keys regex = regex if regex is not None else self.regex self._validate_inputs(ignore_keys, keep_keys, regex) processed_json = ( self.process_record(record_dict, exclude_keys=ignore_keys) for record_dict in self.json_data if (not keep_keys or self.record_filter(record_dict, keep_keys, regex)) and not self.record_filter(record_dict, ignore_keys, regex) ) processed_data = [record_dict for record_dict in processed_json if record_dict is not None] logger.info(f"Total included records - {len(processed_data)}") # Return the list of processed record dicts return processed_data except DataProcessingException: raise except Exception as e: raise DataProcessingException(f"An unexpected error occurred during data processing: {e}")
[docs] @classmethod def record_filter( cls, record_dict: RecordType, record_keys: Optional[list[str]] = None, regex: Optional[bool] = None, ) -> bool: """Indicates if the current record contains any of the keys.""" if not record_keys: return False use_regex = regex if regex is not None else False logger.debug(f"Finding field key matches within processing data: {record_keys}") matches = (nested_key_exists(record_dict, key, regex=use_regex) for key in record_keys) return any(matches)
[docs] def filter_keys( self, prefix: Optional[str] = None, min_length: Optional[int] = None, substring: Optional[str] = None, pattern: Optional[str] = None, include: bool = True, **kwargs: Any, ) -> dict[str, list[str]]: """Filters discovered keys based on specified criteria.""" return KeyFilter.filter_keys( self.key_discoverer.get_all_keys(), prefix=prefix, min_length=min_length, substring=substring, pattern=pattern, include_matches=include, **kwargs, )
def __call__(self, *args: Any, **kwargs: Any) -> list[dict]: """Convenience method that applies a thread lock and processes a single page of JSON data via `.process_page()`. Useful in a threading context where multiple SearchCoordinators may be using the same `RecursiveDataProcessor`. """ with self.lock: return self.process_page(*args, **kwargs)
__all__ = ["RecursiveDataProcessor"]