Source code for scholar_flux.data.pass_through_data_processor

# /data/pass_through_data_processor.py
"""The scholar_flux.data.pass_through_data_processor implements a PassThroughDataProcessor based on the schema required
of the ABCDataProcessor for processing the records and/or metadata extracted from a response.

The pass through data processor is designed for simplicity, allowing end-users to return extracted records as is and
also filter records based on conditions and extract nested key-value pairs within each record if specified.

"""
from typing import Any, Optional
from scholar_flux.utils import nested_key_exists

from scholar_flux.data import ABCDataProcessor
from scholar_flux.exceptions import DataProcessingException

import logging

logger = logging.getLogger(__name__)


[docs] class PassThroughDataProcessor(ABCDataProcessor): """A basic data processor that retains all valid records without modification unless a specific filter for JSON keys are specified. Unlike the DataProcessor, this specific implementation will not flatten records. Instead all filtered and selected records will retain their original nested structure. """
[docs] def __init__( self, ignore_keys: Optional[list[str]] = None, keep_keys: Optional[list[str]] = None, regex: Optional[bool] = True, ) -> None: """Initialize the PassThroughDataProcessor with explicit extraction paths and options. Args: ignore_keys: List of keys to ignore during processing. keep: List of keys that records should contain during processing. value_delimiter: Delimiter for joining multiple values. regex: Whether to use regex for ignore filtering. """ super().__init__() self._validate_inputs(ignore_keys, keep_keys, regex) self.ignore_keys: list[str] = ignore_keys or [] self.keep_keys: list[str] = keep_keys or [] self.regex: bool = regex if regex is not None else False
[docs] def process_record(self, record_dict: dict[str | int, Any]) -> dict[str | int, Any]: """A no-op method retained for to maintain a similar interface as other DataProcessor implementations. Args: - record_dict: The dictionary containing the record data. Returns: - dict: The original processed dictionary """ return record_dict or {}
[docs] def process_page( self, parsed_records: list[dict[str | int, Any]], ignore_keys: Optional[list[str]] = None, keep_keys: Optional[list[str]] = None, regex: Optional[bool] = None, ) -> list[dict]: """Processes and returns each record as is if filtering the final list of records by key is not enabled.""" keep_keys = keep_keys or self.keep_keys ignore_keys = ignore_keys or self.ignore_keys regex = regex if regex is not None else self.regex self._validate_inputs(ignore_keys, keep_keys, regex) try: # processes each individual record dict processed_record_dict_list = [ self.process_record(record_dict) for record_dict in parsed_records if self.record_filter(record_dict, keep_keys, regex) is not False and self.record_filter(record_dict, ignore_keys, regex) is not True ] logging.info(f"total included records - {len(processed_record_dict_list)}") # return the list of processed record dicts return processed_record_dict_list except Exception as e: raise DataProcessingException(f"An unexpected error occurred during data processing: {e}")
[docs] def record_filter( self, record_dict: dict[str | int, Any], record_keys: Optional[list[str]] = None, regex: Optional[bool] = None ) -> Optional[bool]: """Helper method that filters records using regex pattern matching, checking if any of the keys provided in the function call exist.""" # return true by default if no filters are provided if not record_keys: return None use_regex = regex if regex is not None else False # search for the presence or absence of a specific key segment in the code logger.debug(f"Finding field key matches within processing data: {record_keys}") return any(key for key in record_keys if key and nested_key_exists(record_dict, key, regex=use_regex))
__all__ = ["PassThroughDataProcessor"]