Source code for scholar_flux.api.normalization.normalizing_field_map

# scholar_flux.api.normalization.normalizing_field_map.py
"""The scholar_flux.api.normalization.normalizing_field_map implements the `NormalizingFieldMap` for complex record
normalization scenarios.

This class builds on the `BaseFieldMap`, using a `NormalizingDataProcessor` to handle nested field traversal and
scenarios where fields may be differently named in different records from the same API provider. The
`NormalizingFieldMap` can be subclassed with specialized field names for validated normalization.

"""
from pydantic import PrivateAttr
from typing import Any, Mapping, Optional, Sequence
from functools import cached_property
from scholar_flux.api.normalization.base_field_map import BaseFieldMap
from scholar_flux.data.data_processor import DataProcessor
from scholar_flux.data.normalizing_data_processor import NormalizingDataProcessor
from scholar_flux.exceptions import RecordNormalizationException, DataProcessingException
from scholar_flux.utils.record_types import RecordType, RecordList, NormalizedRecordType, NormalizedRecordList
import logging

logger = logging.getLogger(__name__)


[docs] class NormalizingFieldMap(BaseFieldMap): """A field map implementation that builds upon the original BaseFieldMap to recursively find and retrieve nested JSON elements from records with automated index processing and path-guided traversal. During normalization, the `NormalizingFieldMap.fields` property returns all subclassed field mappings as a flattened dictionary (excluding private fields prefixed with underscores). Both simple and nested API-specific field names are matched and mapped to universal field names. Any changes to the instance configuration are automatically detected during normalization by comparing the `_cached_fields` to the updated `fields` property. Examples: >>> from scholar_flux.api.normalization.normalizing_field_map import NormalizingFieldMap >>> field_map = NormalizingFieldMap(provider_name = None, api_specific_fields=dict(title = 'article_title', record_id='ID')) >>> expected_result = field_map.fields | {'provider_name':'core', 'title': 'Decomposition of Political Tactics', 'record_id': 196} >>> result = field_map.apply(dict(provider_name='core', ID=196, article_title='Decomposition of Political Tactics')) >>> cached_fields = field_map._cached_fields >>> print(result == expected_result) >>> result2 = field_map.apply(dict(provider_name='core', ID=196, article_title='Decomposition of Political Tactics')) >>> assert cached_fields is field_map._cached_fields >>> assert result is not result2 """ # Core identifiers provider_name: str = "" _processor: NormalizingDataProcessor = PrivateAttr(default_factory=NormalizingDataProcessor) @cached_property def _cached_fields(self) -> dict[str, Any]: """A cached property used to snapshot the dictionary of field mappings used by the current map on instantiation. This cached private property is assigned the initial value of the `fields` property on the first access of the `NormalizingFieldMap` and used internally to create a cache of the current mapping of API-specific field names to a common set of field names used to normalize both universal records common to a domain as well as API-specific records. This property is later compared against the current `fields` property to determine if the data processor of the current map needs to be regenerated before mapping API-specific parameters to the universal set of fields used to normalize records into a common structure. **Note**: This implementation also accounts for when individual fields of the current NormalizingFieldMap are changed directly by the end-user. """ return self.fields def _refresh_cached_fields(self) -> None: """Helper method for invalidating and refreshing the `_cached_fields` property.""" if "_cached_fields" in self.__dict__: del self._cached_fields @property def processor(self) -> NormalizingDataProcessor: """Generates a NormalizingDataProcessor using the current set of assigned field names. Note that if a processor does not already exist or if the schema is changed, The data processor is recreated with the updated set of fields. """ if not self._processor.record_keys or self.fields != self._cached_fields: self._update_record_keys() return self._processor @processor.setter def processor(self, processor: NormalizingDataProcessor) -> None: """Generates a NormalizingDataProcessor using the current set of assigned field names.""" if not isinstance(processor, DataProcessor): err = f"Expected a DataProcessor, but received a variable of {type(processor)}" logger.error(err) raise RecordNormalizationException(err) self._processor = processor def _update_record_keys(self) -> None: """Updates the record keys of the NormalizingDataProcessor using the current dictionary of field mappings.""" processing_fields = { field: record_key for field, record_key in self.fields.items() if record_key and isinstance(record_key, str) and field != "provider_name" } processing_fields = processing_fields | { self._index_key(field, i): record_key for field, record_key_list in self.fields.items() if isinstance(record_key_list, list) for i, record_key in enumerate(record_key_list) } # if provider name is None/an empty string, replace with if not self.provider_name: processing_fields["provider_name"] = "provider_name" self._processor.update_record_keys(processing_fields) self._refresh_cached_fields()
[docs] def normalize_record( self, record: RecordType, keep_api_specific_fields: Optional[bool | Sequence[str]] = True ) -> NormalizedRecordType: """Maps API-specific fields in dictionaries of processed records to a normalized set of field names.""" if record is None: return {} if not isinstance(record, dict): err = f"Expected record to be of type `dict`, but received a variable of {type(record)}" logger.error(err) raise RecordNormalizationException(err) normalized_record = self.processor.process_record(record) post_processed_record = self._post_process(normalized_record) return self.filter_api_specific_fields(post_processed_record, keep_api_specific_fields)
[docs] def normalize_records( self, records: RecordType | RecordList, keep_api_specific_fields: Optional[bool | Sequence[str]] = True ) -> NormalizedRecordList: """Maps API-specific fields within a processed record list to create a new, normalized record list.""" if records is None: return [] record_list = [records] if isinstance(records, Mapping) else records if not isinstance(record_list, (list, Mapping)): err = f"Expected the record list to be of type `list`, but received a variable of {type(record_list)}" logger.error(err) raise RecordNormalizationException(err) try: normalized_record_list = self.processor(record_list) except DataProcessingException as e: err = f"Encountered an error during the data processing step of record normalization: {e}" logger.error(err) raise RecordNormalizationException(err) return [ self.filter_api_specific_fields( self._post_process(normalized_record), keep_api_specific_fields=keep_api_specific_fields ) for normalized_record in normalized_record_list ]
def _post_process(self, record: NormalizedRecordType) -> NormalizedRecordType: """Override in subclasses for provider-specific transformations.""" return self._add_defaults(self._resolve_fallbacks(record)) @classmethod def _index_key(cls, field: str, index: int, suffix: str = "_fallback_") -> str: """Adds a simple index to the current field name to distinguish it for later retrieval.""" if index == 0: return field return f"{field}{suffix}{index}" def _resolve_fallbacks(self, record: NormalizedRecordType) -> NormalizedRecordType: """Resolve universal fields with lists of record keys that may vary depending on the record type.""" record_keys = self.fields result = record.copy() for field, record_key_list in record_keys.items(): if not isinstance(record_key_list, list): continue for i, _ in enumerate(record_key_list): current_field_key = self._index_key(field, i) if i > 0 else field value = result.pop(current_field_key, None) if i > 0 else result.get(field) if result.get(field) is None and i > 0 and value is not None: result[field] = value return result
__all__ = ["NormalizingFieldMap"]