Source code for scholar_flux.data.base_extractor
# /data/base_extractor
"""The scholar_flux.data.base_extractor implements the core processes used to extract data from parsed responses.
The `BaseDataExtractor` implements the methods and functionality that are used when the structure of the parsed response
and paths for records and metadata are already known. The `BaseDataExtractor` serves as the base for later extension
with the `scholar_flux.data.data_extractor.DataExtractor` to dynamically identify records and metadata paths when the
structure of the response is not provided.
"""
from typing import Any, Optional, Union
from typing_extensions import Self
from scholar_flux.exceptions import DataExtractionException
from scholar_flux.utils import get_nested_data, try_int, try_dict, as_list_1d, unlist_1d, PathUtils
from scholar_flux.utils.record_types import RecordList, MetadataType
from scholar_flux.utils.repr_utils import generate_repr
import logging
logger = logging.getLogger(__name__)
[docs]
class BaseDataExtractor:
"""Base DataExtractor implementing the minimum components necessary to extract records and metadata from parsed
responses when the location of records and metadata is known beforehand."""
[docs]
def __init__(
self,
record_path: Optional[list] = None,
metadata_path: Optional[list[list] | dict[str, list]] = None,
):
"""Initialize the DataExtractor with metadata and records to extract separately.
If record path or metadata_path are specified,
then the data extractor will attempt to retrieve the metadata and records at the
provided paths. Note that, as metadata_paths can be associated with multiple keys,
starting from the outside dictionary, we may have to specify a dictionary containing
keys denoting metadata variables and their paths as a list of values indicating how to
retrieve the value. The path can also be given by a list of lists describing how to
retrieve the last element.
While the encouraged type for `record_path` is a list of strings that each represent each nested path element
to be traversed to arrive at a value for a field, a delimited string can also be used with the default
delimiter being `scholar_flux.utils.PathStr.DELIMITER`. Similarly, a list or dictionary of path strings
can also be used as shorthand for the individual metadata fields containing relevant metadata values.
Similarly, a list or dictionary of path strings can also be used as shorthand for the individual metadata
fields containing relevant metadata values.
Args:
record_path (Optional[List[str]]):
Custom path to find records in the parsed data. Contains a list of strings and rarely integers indexes
indicating how to recursively find the list of records.
metadata_path (List[List[str]] | Optional[Dict[str, List[str]]]):
Identifies the paths in a dictionary associated with metadata as opposed to records. This can be a list
of paths where each element is a list describing how to arrive at a terminal element.
"""
self.metadata_path = self._prepare_metadata_path(metadata_path or {})
self.record_path = record_path if not isinstance(record_path, str) else PathUtils.path_split(record_path)
self._validate_inputs()
@staticmethod
def _prepare_metadata_path(
metadata_path: list[list] | list[str] | dict[str, list] | dict[str, Any],
) -> list[list[str]] | Optional[dict[str, list[str]]]:
"""Helper method for splitting metadata paths with nested elements that are represented as strings.
The delimiter, `scholar_flux.utils.PathUtils.DELIMITER` (`.` by default) is used if a delimiter is not
directly specified.
Args:
metadata_path (list[list] | list[str] | dict[str, list] | dict[str, Any]):
A metadata path to split if represented as a list containing nested string elements representing paths.
Returns:
metadata_path (List[List[str]] | Dict[str, List[str]]):
The metadata path list that identifies metadata fields within a parsed response retrieved from an API.
"""
if isinstance(metadata_path, list):
return [
PathUtils.path_split(path_element) if isinstance(path_element, str) else path_element
for path_element in metadata_path
]
if isinstance(metadata_path, dict):
return {
key: PathUtils.path_split(path_element) if isinstance(path_element, str) else path_element
for key, path_element in metadata_path.items()
}
return metadata_path
@staticmethod
def _prepare_record_path(record_path: Optional[list[str] | str]) -> Optional[list[str]]:
"""Helper method for splitting record paths with nested elements that are represented as strings.
The delimiter, `scholar_flux.utils.PathUtils.DELIMITER` (`.` by default) is used if a delimiter is not
directly specified.
Args:
record_path (Optional[List[str] | str]):
A record path to split if represented as a string
Returns:
metadata_path (List[List[str]] | Optional[Dict[str, List[str]]]):
The formatted record path representing the keys that must be traversed to arrive at response records.
"""
return PathUtils.path_split(record_path) if isinstance(record_path, str) else record_path
def _validate_inputs(self) -> None:
"""Method used to validate the inputs provided to the DataExtractor prior to its later use in extracting
metadata and records. This method operates by verifying the attributes associated with the current data
extractor once the attributes are set.
Validated Attributes:
record_path (Optional[List[str | None]]): The path where a list of records are located
metadata_path (Optional[List[str | None]]): The list or dictionary of paths where metadata records are located
dynamic_record_identifiers (Optional[List[str | None]]): Keyword identifier indicating when singular records in a dictionary
can be identified as such in contrast to metadata
dynamic_metadata_identifiers (Optional[List[str | None]]): Keyword identifier indicating when record metadata keys in a dictionary
can be identified as such in contrast to metadata
Raises:
DataExtractionException: Indicates an error in the DataExtractor and identifies where the inputs take on an invalid value
"""
self._validate_paths(self.record_path, self.metadata_path)
return None
@classmethod
def _validate_paths(
cls,
record_path: Optional[list] = None,
metadata_path: Optional[list[list] | dict[str, list]] = None,
) -> None:
"""Method used to validate the path inputs provided to the DataExtractor prior to its later use in extracting
metadata and records.
Args:
record_path (Optional[List[str | None]]): The path where a list of records are located
metadata_path (Optional[List[str | None]]): The list or dictionary of paths where metadata records are located
Raises:
DataExtractionException: Indicates an error in the DataExtractor and identifies where the inputs take on an invalid value
"""
try:
if record_path is not None:
if not isinstance(record_path, list):
raise TypeError(f"A list is required for a record path. Received: {type(record_path)}")
if not all(isinstance(path, (str, int)) for path in record_path):
raise KeyError(
f"At least one path in the provided record path is not an integer or string: {record_path}"
)
if metadata_path is not None:
if not isinstance(metadata_path, (list, dict)):
raise KeyError(
f"The provided metadata path override is not a list or dictionary: {type(metadata_path)}"
)
if not all(isinstance(path, (str, int, list)) for path in metadata_path):
raise KeyError(
f"At least one path in the provided metadata path override is not a list, integer, or string: {metadata_path}"
)
except (KeyError, TypeError) as e:
raise DataExtractionException(
f"Error initializing the DataExtractor: At least one of the inputs are invalid. {e}"
) from e
return None
[docs]
def extract_metadata(self, parsed_page_dict: dict[str, Any]) -> MetadataType:
"""Extract metadata from the parsed page dictionary.
Args:
parsed_page_dict (Dict): The dictionary containing the page data to be parsed.
Returns:
Dict: The extracted metadata.
"""
if not self.metadata_path:
logger.info("Metadata paths are empty: skipping metadata extraction")
return {}
metadata = {}
try:
if isinstance(self.metadata_path, list):
# converts a list into a dictionary to ensure compatibility with the current method
# as_list_1d ensures that, if the current path is not in a list, it is coerced into a list
metadata_path = {as_list_1d(path)[-1]: as_list_1d(path) for path in self.metadata_path}
else:
## ensures that all paths are lists and nests the path in a list otherwise
metadata_path = {as_list_1d(key)[-1]: as_list_1d(path) for key, path in self.metadata_path.items()}
# attempts to retrieve the path from the dictionary of metadata paths
metadata = {key: try_int(get_nested_data(parsed_page_dict, path)) for key, path in metadata_path.items()}
missing_keys = [str(k) for k, v in metadata.items() if v is None]
if missing_keys:
logger.warning(f"The following metadata keys are missing or None: {', '.join(missing_keys)}")
except KeyError as e:
logger.error(f"Error extracting metadata due to missing key: {e}")
except Exception as e:
msg = f"An unexpected error occurred during metadata extraction due to the following exception: {e}"
logger.error(msg)
raise DataExtractionException(msg)
return metadata
[docs]
def extract_records(self, parsed_page_dict: dict) -> Optional[RecordList]:
"""Extract records from parsed data as a list of dicts.
Args:
parsed_page_dict (Dict): The dictionary containing the page data to be parsed.
Returns:
Optional[RecordList]: A list of records as dictionaries, or None if extraction fails.
"""
try:
nested_data = get_nested_data(parsed_page_dict, self.record_path) if self.record_path else None
if isinstance(nested_data, list):
return nested_data
if not nested_data:
logger.debug(f"No records extracted from path {self.record_path}")
return None
logger.debug(f"Expected a list at path {self.record_path}. Instead received {type(nested_data)}")
return None
except Exception as e:
msg = f"An unexpected error occurred during record extraction due to the following exception: {e}"
logger.error(msg)
raise DataExtractionException(msg)
@classmethod
def _prepare_page(cls, parsed_page: Union[list[dict], dict]) -> dict:
"""Prepares the JSON data for metadata and record extraction by coercing it into a dictionary if not already a
dictionary.
Args:
parsed_page (List[Dict] | Dict):
The list or dictionary containing the page data and metadata to be extracted.
Returns:
Dict]: A dictionary containing the metadata and records to extract
"""
if isinstance(parsed_page, list):
parsed_page = unlist_1d(parsed_page)
if not isinstance(parsed_page, dict):
parsed_page_dict = try_dict(parsed_page)
if parsed_page_dict is None:
raise DataExtractionException(
f"Error converting parsed_page_dict of type {parsed_page} to a dictionary"
)
parsed_page_dict = {str(k): v for k, v in parsed_page_dict.items()}
parsed_page = parsed_page_dict
return parsed_page
[docs]
def extract(self, parsed_page: Union[list[dict], dict]) -> tuple[Optional[RecordList], Optional[MetadataType]]:
"""Extract both records and metadata from the parsed page dictionary.
Args:
parsed_page (Union[list[dict], dict]): The dictionary containing the page data and metadata to be extracted.
Returns:
tuple[Optional[RecordList], Optional[MetadataType]]:
A tuple containing the list of records and the metadata dictionary.
"""
parsed_page = self._prepare_page(parsed_page)
records = self.extract_records(parsed_page)
metadata = self.extract_metadata(parsed_page)
return records, metadata
def __call__(self, parsed_page: Union[list[dict], dict]) -> tuple[Optional[RecordList], Optional[MetadataType]]:
"""Helper method enabling users to call the extractor as a function to extract both records and metadata.
Args:
parsed_page (List[Dict] | Dict): The dictionary containing the page data and metadata to be extracted.
Returns:
tuple[Optional[RecordList], Optional[MetadataType]]: A tuple containing the list of records and the metadata dictionary.
"""
return self.extract(parsed_page)
[docs]
def structure(self, flatten: bool = False, show_value_attributes: bool = True) -> str:
"""Base method for showing the structure of the current Data Extractor. This method reveals the configuration
settings of the extractor config that will be used to extract records and metadata.
Returns:
str: The current structure of the BaseDataExtractor or its subclass.
"""
return generate_repr(self, flatten=flatten, show_value_attributes=show_value_attributes)
def __repr__(self) -> str:
"""Base method for identifying the current implementation of the BaseDataExtractor. Subclasses can override this
for more specific descriptions of attributes and defaults. Useful for showing the options being used for
extracting metadata and records from the parsed json/data dictionaries from the api response.
Returns:
str: The representation of the current object
"""
return self.structure()
[docs]
@classmethod
def update(cls, data_extractor: Self, **data_extractor_kwargs: Any) -> Self:
"""Helper method for creating a new BaseDataExtractor instance, replacing only the specified components.
Args:
data_extractor (Self): A previously created BaseDataExtractor instance
**data_extractor_kwargs:
Keyword arguments used to replace components of the BaseDataExtractor. Unspecified fields from the
previous `BaseDataExtractor` remain unchanged.
Returns:
BaseDataExtractor: A new data extractor instance with the specified parameter updates
"""
if not isinstance(data_extractor, BaseDataExtractor):
raise TypeError(
"Expected a BaseDataExtractor or subclass to perform parameter updates. Received type "
f"{type(data_extractor)}"
)
return cls(
record_path=data_extractor_kwargs.get("record_path", data_extractor.record_path),
metadata_path=data_extractor_kwargs.get("metadata_path", data_extractor.metadata_path),
)
__all__ = ["BaseDataExtractor"]