# /data/data_processor.py
"""The scholar_flux.data.data_processor implements a DataProcessor based on the schema required of the ABCDataProcessor
for processing the records and/or metadata extracted from a response. The data processor implements manual nested key
retrieval by using the list of record_keys that point to the paths of fields to extract from the passed list of nested
JSON dictionary records.
The data processor can be used to filter records based on conditions and extract nested key-value pairs within each
record to ensure that relevant records and fields from records are retained
"""
from typing import Any, Optional
from scholar_flux.utils import get_nested_data, as_list_1d, unlist_1d, nested_key_exists
from scholar_flux.data import ABCDataProcessor
from scholar_flux.exceptions import DataProcessingException, DataValidationException
import logging
logger = logging.getLogger(__name__)
[docs]
class DataProcessor(ABCDataProcessor):
"""Initialize the DataProcessor with explicit extraction paths and options. The DataProcessor performs the selective
extraction os specific fields from each record within a page (list) of JSON (dictionary) records and assumes that
the paths to extract are known beforehand.
Args:
record_keys: Keys to extract, as a dict of output_key to path, or a list of paths.
ignore_keys: List of keys to ignore during processing.
keep_keys: List of keys that records should be retained during processing.
value_delimiter: Delimiter for joining multiple values.
regex: Whether to use regex for ignore filtering.
Examples
>>> from scholar_flux.data import DataProcessor
>>> data = [{'id':1, 'school':{'department':'NYU Department of Mathematics'}},
>>> {'id':2, 'school':{'department':'GSU Department of History'}},
>>> {'id':3, 'school':{'organization':'Pharmaceutical Research Team'}}]
# creating a basic processor
>>> data_processor = DataProcessor(record_keys = [['id'], ['school', 'department'], ['school', 'organization']]) # instantiating the class
### The process_page method can then be referenced using the processor as a callable:
>>> result = data_processor(data) # recursively flattens and processes by default
>>> print(result)
# OUTPUT: [{'id': 1, 'school.department': 'NYU Department of Mathematics', 'school.organization': None},
# {'id': 2, 'school.department': 'GSU Department of History', 'school.organization': None},
# {'id': 3, 'school.department': None, 'school.organization': 'Pharmaceutical Research Team'}]
"""
[docs]
def __init__(
self,
record_keys: Optional[dict[str | int, Any] | list[list[str | int]]] = None,
ignore_keys: Optional[list[str]] = None,
keep_keys: Optional[list[str]] = None,
value_delimiter: Optional[str] = "; ",
regex: Optional[bool] = True,
) -> None:
"""Initialize the DataProcessor with explicit extraction paths and options.
Args:
record_keys: Keys to extract, as a dict of output_key to path, or a list of paths.
ignore_keys: List of keys to ignore during processing.
value_delimiter: Delimiter for joining multiple values.
regex: Whether to use regex for ignore filtering.
"""
super().__init__()
self._validate_inputs(ignore_keys, keep_keys, regex, record_keys=record_keys, value_delimiter=value_delimiter)
self.record_keys: dict[str | int, list[str | int]] = self._prepare_record_keys(record_keys) or {}
self.ignore_keys: list[str] = ignore_keys or []
self.keep_keys: list[str] = keep_keys or []
self.value_delimiter = value_delimiter
self.regex: bool = regex if regex else False
@staticmethod
def _prepare_record_keys(
record_keys: Optional[dict[str | int, Any] | list[list[str | int]]],
) -> Optional[dict[str | int, list[str | int]]]:
"""Convert record_key input into a standardized dict key value pairs. The keys represent the final key/column
name corresponding to each nested path. Its corresponding value is a list containing each step as an element
leading up to the final element/node in the path.
Accepts either a list of paths or a dict of output_key to path.
Args:
record_keys (Optional[dict[str | int, Any] | list[list[str | int]]]): Key/path combinations indicating
the necessary paths to extract and, if a dictionary, the key to rename the path with.
Returns:
"""
try:
if record_keys is None:
return None
elif isinstance(record_keys, list):
# creates a dictionary where the joined list represents the key
record_keys_dict: Optional[dict[str | int, list[str | int]]] = {
".".join(f"{p}" for p in as_list_1d(record_key_path)): as_list_1d(record_key_path)
for record_key_path in record_keys
if record_key_path != []
}
elif isinstance(record_keys, dict):
# retrieve the value in the dictionary as the full path to the element.
# If the path is an empty list, the path will defaults to the key instead
record_keys_dict = {key: (as_list_1d(path) or [key]) for key, path in record_keys.items()}
else:
raise TypeError(
"Expected a dictionary of string to path mappings or a list of lists containing paths. "
f"Received type ({record_keys})"
)
return record_keys_dict
except (TypeError, AttributeError, ValueError) as e:
raise DataValidationException("The record_keys attribute could not be prepared. Check the inputs: ") from e
[docs]
def process_record(self, record_dict: dict[str | int, Any]) -> dict[str, Any]:
"""Processes a record dictionary to extract record data and article content, creating a processed record
dictionary with an abstract field.
Args:
- record_dict: The dictionary containing the record data.
Returns:
- dict: A processed record dictionary with record keys processed and an abstract field created from the article content.
"""
# retrieve a dict containing the fields for the current record
if not record_dict:
logger.debug("A record is empty: skipping,,,")
# Simplified record data processing using dictionary comprehension
processed_record_dict = (
{key: self.extract_key(record_dict, path[-1], path[:-1]) for key, path in self.record_keys.items()}
if self.record_keys
else {}
)
return self.collapse_fields(processed_record_dict)
[docs]
def collapse_fields(self, processed_record_dict: dict) -> dict[str, list[str | int] | str | int]:
"""Helper method for joining lists of data into a singular string for flattening."""
if processed_record_dict and self.value_delimiter is not None:
return {
k: (
self.value_delimiter.join(str(i) for i in field_item)
if field_item is not None and len(field_item) > 1
else unlist_1d(field_item)
)
for k, field_item in processed_record_dict.items()
}
return {k: unlist_1d(v) for k, v in processed_record_dict.items()}
[docs]
def process_page(
self,
parsed_records: list[dict[str | int, Any]],
ignore_keys: Optional[list[str]] = None,
keep_keys: Optional[list[str]] = None,
regex: Optional[bool] = None,
) -> list[dict]:
"""Core method of the data processor that enables the processing of lists of dictionary records to filter and
process records based on the configuration of the current DataProcessor.
Args:
parsed_records (list[dict[str | int, Any]]): The records to process and/or filter
ignore_keys (Optional[list[str]]): Optional overrides identifying records to ignore based on matching keys
or absence of keys
keep_keys (Optional[list[str]]): Optional overrides identifying records to keep based based on matching keys
regex: (Optional[bool]): Used to determine whether or not to filter records using regular expressions
"""
keep_keys = keep_keys or self.keep_keys
ignore_keys = ignore_keys or self.ignore_keys
regex = regex if regex is not None else self.regex
self._validate_inputs(
ignore_keys, keep_keys, regex, record_keys=self.record_keys, value_delimiter=self.value_delimiter
)
# processes each individual record dict
try:
processed_record_dict_list = [
self.process_record(record_dict)
for record_dict in parsed_records
if self.record_filter(record_dict, keep_keys, regex) is not False
and self.record_filter(record_dict, ignore_keys, regex) is not True
]
logging.info(f"total included records - {len(processed_record_dict_list)}")
# return the list of processed record dicts
return processed_record_dict_list
except Exception as e:
raise DataProcessingException(f"An unexpected error occurred during data processing: {e}")
[docs]
def record_filter(
self, record_dict: dict[str | int, Any], record_keys: Optional[list[str]] = None, regex: Optional[bool] = None
) -> Optional[bool]:
"""Helper method that filters records using regex pattern matching, checking if any of the keys provided in the
function call exist."""
# return true by default if no filters are provided
if not record_keys:
return None
use_regex = regex if regex is not None else False
# search for the presence or absence of a specific key segment in the code
logger.debug(f"Finding field key matches within processing data: {record_keys}")
return any(key for key in record_keys if key and nested_key_exists(record_dict, key, regex=use_regex))
__all__ = ["DataProcessor"]