# /api/models/responses.py
"""The scholar_flux.api.models.responses module contains the core response types used to indicate whether the retrieval
and processing of API responses was successful or unsuccessful. Each class uses pydantic to ensure type-validated
responses while ensuring flexibility in how responses can be used and applied.
Classes:
ProcessedResponse:
Indicates whether an API was successfully retrieved, parsed, and processed. This model is designed to
facilitate the inspection of intermediate results and retrieval of extracted response records.
ErrorResponse:
Indicates that an error occurred somewhere in the retrieval or processing of an API response. This
class is designed to allow inspection of error messages and failure results to aid in debugging in case
of unexpected scenarios.
NonResponse:
Inherits from ErrorResponse and is designed to indicate that an error occurred in the preparation of a
request or the sending/retrieval of a response.
"""
from typing import Optional, Dict, List, Any, MutableMapping
from scholar_flux.exceptions import InvalidResponseReconstructionException
from typing_extensions import Self
from pydantic import BaseModel, field_serializer, field_validator
from scholar_flux.api.models.reconstructed_response import ReconstructedResponse
from scholar_flux.utils.helpers import generate_iso_timestamp, parse_iso_timestamp, format_iso_timestamp
from scholar_flux.utils import CacheDataEncoder, generate_repr
from scholar_flux.utils.response_protocol import ResponseProtocol
from scholar_flux.api.validators import validate_url
from datetime import datetime
from http.client import responses
from scholar_flux.utils import try_int
from json import JSONDecodeError
import json
import logging
import requests
logger = logging.getLogger(__name__)
[docs]
class APIResponse(BaseModel):
"""A Response wrapper for responses of different types that allows consistency when using several possible backends.
The purpose of this class is to serve as the base for managing responses received from scholarly APIs while
processing each component in a predictable, reproducible manner,
This class uses pydantic's data validation and serialization/deserialization methods to aid caching and includes
properties that refer back to the original response for displaying valid response codes, URLs, etc.
All future processing/error-based responses classes inherit from and build off of this class.
Args:
cache_key (Optional[str]): A string for recording cache keys for use in later steps of the response
orchestration involving processing, cache storage, and cache retrieval
response (Any): A response or response-like object to be validated and used/re-used in later caching
and response processing/orchestration steps.
created_at (Optional[str]): A value indicating the time in which a response or response-like object was created.
Example:
>>> from scholar_flux.api import APIResponse
# Using keyword arguments to build a basic APIResponse data container:
>>> response = APIResponse.from_response(
>>> cache_key = 'test-response',
>>> status_code = 200,
>>> content=b'success',
>>> url='https://example.com',
>>> headers={'Content-Type': 'application/text'}
>>> )
>>> response
# OUTPUT: APIResponse(cache_key='test-response', response = ReconstructedResponse(
# status_code=200, reason='OK', headers={'Content-Type': 'application/text'},
# text='success', url='https://example.com'
#)
>>> assert response.status == 'OK' and response.text == 'success' and response.url == 'https://example.com'
# OUTPUT: True
>>> assert response.validate_response()
# OUTPUT: True
"""
cache_key: Optional[str] = None
response: Optional[Any] = None
created_at: Optional[str] = None
[docs]
@field_validator("created_at", mode="before")
def validate_iso_timestamp(cls, v: Optional[str | datetime]) -> Optional[str]:
"""Helper method for validating and ensuring that the timestamp accurately follows an iso 8601 format."""
if not v:
return None
if isinstance(v, str):
if not parse_iso_timestamp(v):
logger.warning(f"Expected a parsed timestamp but received an unparseable value: {v}")
return None
elif isinstance(v, datetime):
v = format_iso_timestamp(v)
else:
logger.warning(f"Expected an iso8601-formatted datetime, Received type ({type(v)})")
return None
return v
@property
def status_code(self) -> Optional[int]:
"""Helper property for retrieving a status code from the APIResponse.
Returns:
Optional[int]: The status code associated with the response (if available)
"""
try:
status_code = getattr(self.response, "status_code", None)
return status_code if isinstance(status_code, int) else try_int(status_code)
except (ValueError, AttributeError):
return None
@property
def reason(self) -> Optional[str]:
"""Uses the underlying reason attribute on the response object, if available, to create a human readable status
description.
Returns:
Optional[str]: The status description associated with the response.
"""
reason = getattr(self.response, "reason", None)
reason = reason if reason else responses.get(self.status_code or -1)
if isinstance(reason, str):
return reason
return None
@property
def status(self) -> Optional[str]:
"""Helper property for retrieving a human-readable status description APIResponse.
Returns:
Optional[int]: The status description associated with the response (if available).
"""
return self.reason or getattr(self.response, "status", None) or responses.get(self.status_code or -1)
@property
def headers(self) -> Optional[MutableMapping[str, str]]:
"""Return headers from the underlying response, if available and valid.
Returns:
MutableMapping[str, str]: A dictionary of headers from the response
"""
if self.response is not None:
headers = getattr(self.response, "headers", None)
if isinstance(headers, (dict, MutableMapping)):
return dict(headers)
logger.warning("The current APIResponse does not have a valid response header")
return None
@property
def content(self) -> Optional[bytes]:
"""Return content from the underlying response, if available and valid.
Returns:
(bytes): The bytes from the original response content
"""
if self.response is not None:
content = getattr(self.response, "content", None)
if isinstance(content, str):
return content.encode("utf-8")
if isinstance(content, bytes):
return content
logger.warning("The current APIResponse does not have a valid response content attribute")
return None
@property
def text(self) -> Optional[str]:
"""Attempts to retrieve the response text by first decoding the bytes of the its content. If not available, this
property attempts to directly reference the text attribute directly.
Returns:
Optional[str]: A text string if the text is available in the correct format, otherwise None
"""
if self.response is not None:
#
text = self.content.decode("utf-8") if self.content is not None else getattr(self.response, "text", None)
if isinstance(text, str):
return text
logger.warning("The current APIResponse does not have a valid response text attribute")
return None
@property
def url(self) -> Optional[str]:
"""Return URL from the underlying response, if available and valid.
Returns:
str: A string of the original URL if available. Accounts for objects that
that indicate the original url when converted as a string
"""
url = getattr(self.response, "url", None)
if url:
url_string = url if isinstance(url, str) else str(url)
return url_string if validate_url(url_string) else None
return None
[docs]
def validate_response(self) -> bool:
"""Helper method for determining whether the response attribute is truly a response. If the response isn't a
requests response, we use duck-typing to determine whether the response attribute, itself, has the expected
attributes of a response by using properties for checking types vs None (if the attribute isn't the expected
type)
Returns:
bool: An indicator of whether the current APIResponse.response attribute is
actually a response
"""
if isinstance(self.response, requests.Response):
return True
return self._is_response_like(self)
@classmethod
def _is_response_like(cls, response: Any) -> bool:
"""Helper method for validating whether each of the core components of a response are populated with the correct
response types or are instead missing.
The following properties that refer back to the original response should be available:
1. status_code: (int)
2. reason: string
3. headers: dictionary
4. content: bytes
5. url: string or URL-like field
"""
if not isinstance(response, ResponseProtocol):
return False
# e.g. status code, reason, headers, content, ir;
response_like = all(
getattr(response, attribute, None) is not None for attribute in ReconstructedResponse.fields()
)
return response_like
[docs]
@classmethod
def from_response(
cls,
response: Optional[Any] = None,
cache_key: Optional[str] = None,
auto_created_at: Optional[bool] = None,
**kwargs,
) -> Self:
"""Construct an APIResponse from a response object or from keyword arguments.
If response is not a valid response object, builds a minimal response-like object from kwargs.
"""
model_kwargs = {field: kwargs.pop(field, None) for field in cls.model_fields if field in kwargs}
response = (
ReconstructedResponse.build(response, **kwargs) if not isinstance(response, requests.Response) else response
)
if auto_created_at is True and not model_kwargs.get("created_at"):
model_kwargs["created_at"] = generate_iso_timestamp()
return cls(response=response, cache_key=cache_key, **model_kwargs)
[docs]
@field_serializer("response", when_used="json")
def encode_response(self, response: Any) -> Optional[Dict[str, Any] | List[Any]]:
"""Helper method for serializing a response into a json format. Accounts for special cases such as
CaseInsensitiveDict fields that are otherwise unserializable.
From this step, pydantic can safely use json internally to dump the encoded response fields
"""
if isinstance(response, (requests.Response, ReconstructedResponse)) or self._is_response_like(response):
return self._encode_response(response)
return None
[docs]
@classmethod
def serialize_response(cls, response: requests.Response | ResponseProtocol) -> Optional[str]:
"""Helper method for serializing a response into a json format. The response object is first converted into a
serialized string and subsequently dumped after ensuring that the field is serializable.
Args:
response (Response, ResponseProtocol)
"""
try:
encoded_response = cls._encode_response(response)
if encoded_response:
return json.dumps(encoded_response)
except (InvalidResponseReconstructionException, TypeError, AttributeError, UnicodeEncodeError) as e:
logger.error(
f"Could not encode the value of type {type(response)} into a serialized json object "
f"due to an error: {e}"
)
return None
@classmethod
def _encode_response(cls, response: requests.Response | ResponseProtocol) -> Dict[str, Any]:
"""Helper method for encoding a response using a ReconstructedResponse to store the core fields for responses
and response-like objects.
Elements from the response are first extracted from the response object using the ReconstructedResponse data
model. After extracting the fields from the model as a dictionary, the fields are subsequently encoded using
the scholar_flux.utils.CacheDataEncoder that ensures all fields are encodable.
Afterward, the dictionary can safely be serialized via json.dumps.
Note that fields such as CaseInsensitiveDicts and other MutableMappings are converted to dictionaries
to support the process of encoding each field.
Args:
response: A response or response-like object whose core fields are be encoded
Returns:
Dict[str, Any]: A dictionary formatted in a way that enables core fields to be encoded
using json.dumps function from the json module in the standard library that
serializes dictionaries into strings.
"""
reconstructed_response = ReconstructedResponse.build(response)
response_dictionary = CacheDataEncoder.encode(reconstructed_response.asdict())
return response_dictionary
@classmethod
def _decode_response(cls, encoded_response_dict: Dict[str, Any], **kwargs) -> Optional[ReconstructedResponse]:
"""Helper method for decoding a dictionary of encoded fields that were previously encoded using
_encode_response. This class approximately creates the previous response object by creating a
ReconstructedResponse that retains core fields from the original response to support the orchestration of
response processing and caching.
Args:
encoded_response_dict (Dict[str, Any]):
Contains a list of all encoded dictionary-based elements of the original response or response-like
object.
**kwargs:
Any keyword-based overrides to use when building a request from the decoded response dictionary
when the same values in the decoded_response are otherwise missing
Returns:
Optional[ReconstructedResponse]:
Creates a reconstructed response with from the original encoded fields.
"""
field_set = set(ReconstructedResponse.fields())
response_dict = (
encoded_response_dict.get("response")
if not field_set.intersection(encoded_response_dict)
and isinstance(encoded_response_dict, dict)
and "response" in encoded_response_dict
else encoded_response_dict
)
decoded_response = CacheDataEncoder.decode(response_dict) or {}
decoded_response.update(
{field: value for field, value in kwargs.items() if decoded_response.get(field) is None}
)
return ReconstructedResponse.build(**decoded_response)
[docs]
@classmethod
def from_serialized_response(cls, response: Optional[Any] = None, **kwargs) -> Optional[ReconstructedResponse]:
"""Helper method for creating a new APIresponse from the original dumped object. This method Accounts for lack
of ease of serialization of responses by decoding the response dictionary that was loaded from a string using
json.loads from the json module in the standard library.
If the response input is still a serialized string, this method will manually load the response dict with
the `APIresponse._deserialize_response_dict` class method before further processing.
Args:
response (Any): A prospective response value to load into the API Response.
Returns:
Optional[ReconstructedResponse]: A reconstructed response object, if possible. Otherwise returns None
"""
if isinstance(response, str):
response = cls._deserialize_response_dict(response)
if isinstance(response, dict):
return cls._decode_response(response, **kwargs)
elif kwargs:
return ReconstructedResponse.build(**kwargs)
return None
[docs]
@classmethod
def as_reconstructed_response(cls, response: Any) -> ReconstructedResponse:
"""Classmethod designed to create a reconstructed response from an original response object. This method coerces
response attributes into a reconstructed response that retains the original content, status code, headers, URL,
reason, etc.
Returns:
ReconstructedResponse: A minimal response object that contains the core attributes needed to support
other processes in the scholar_flux module such as response parsing and caching.
"""
if isinstance(response, APIResponse):
response = response.response
return ReconstructedResponse.build(response)
def __eq__(self, other: Any) -> bool:
"""Helper method for validating whether responses are equal. Elements of the same type are considered a
necessary quality for processing components to be considered equal.
Args:
other (Any): An object to compare against the current APIResponse object/subclass
Returns:
bool: True if the value is equal to the current APIResponse object, otherwise False
"""
# accounting for subclasses:
if not isinstance(other, self.__class__):
return False
return self.model_dump(exclude={"created_at"}) == other.model_dump(exclude={"created_at"})
@classmethod
def _deserialize_response_dict(cls, serialized_response_dict: str) -> Optional[dict]:
"""Helper method for deserializing the dumped model json.
Attempts to load json data from a string if possible. Otherwise returns None
"""
try:
deserialized_dict = json.loads(serialized_response_dict)
return deserialized_dict
except (JSONDecodeError, TypeError) as e:
logger.warning(f"Could not decode the response argument from a string to JSON object: {e}")
return None
[docs]
def raise_for_status(self):
"""Uses an underlying response object to validate the status code associated with the request.
If the attribute isn't a response or reconstructed response, the code will coerce the class into a response
object to verify the status code for the request URL and response.
"""
if self.response is not None and isinstance(self.response, (requests.Response, ReconstructedResponse)):
self.response.raise_for_status()
else:
self.as_reconstructed_response(self.response).raise_for_status()
def __repr__(self) -> str:
"""Helper method for generating a simple representation of the current API Response."""
return generate_repr(
self,
exclude={
"created_at",
},
)
[docs]
class ErrorResponse(APIResponse):
"""Returned when something goes wrong, but we don’t want to throw immediately—just hand back failure details.
The class is formatted for compatibility with the ProcessedResponse,
"""
message: Optional[str] = None
error: Optional[str] = None
[docs]
@classmethod
def from_error(
cls,
message: str,
error: Exception,
cache_key: Optional[str] = None,
response: Optional[requests.Response | ResponseProtocol] = None,
) -> Self:
"""Creates and logs the processing error if one occurs during response processing.
Args:
response (Response): Raw API response.
cache_key (Optional[str]): Cache key for storing results.
Returns:
ErrorResponse: A Dataclass Object that contains the error response data
and background information on what precipitated the error.
"""
creation_timestamp = generate_iso_timestamp()
return cls(
cache_key=cache_key,
response=response.response if isinstance(response, APIResponse) else response,
message=message,
error=type(error).__name__,
created_at=creation_timestamp,
)
@property
def parsed_response(self) -> None:
"""Provided for type hinting + compatibility."""
return None
@property
def extracted_records(self) -> None:
"""Provided for type hinting + compatibility."""
return None
@property
def processed_records(self) -> None:
"""Provided for type hinting + compatibility."""
return None
@property
def metadata(self) -> None:
"""Provided for type hinting + compatibility."""
return None
@property
def data(self) -> None:
"""Provided for type hinting + compatibility."""
return self.processed_records
def __repr__(self) -> str:
"""Helper method for creating a string representation of the underlying ErrorResponse."""
return f"ErrorResponse(status_code={self.status_code}, error={self.error}, " f"message={self.message!r})"
def __len__(self) -> int:
"""Helper method added for compatibility with the use-case of the ProcessedResponse.
Always returns 0, indicating that no records were successfully processed.
"""
return 0
def __bool__(self):
"""Indicates that the underlying response was not successfully processed or contained an error code."""
return False
[docs]
class NonResponse(ErrorResponse):
"""Response class used to indicate that an error occurred in the preparation of a request or in the retrieval of a
response object from an API.
This class is used to signify the error that occurred within the search process using a similar interface as the
other scholar_flux Response dataclasses.
"""
response: None = None
def __repr__(self) -> str:
"""Helper method for creating a string representation of the underlying ErrorResponse."""
return f"NonResponse(error={self.error}, " f"message={self.message!r})"
[docs]
class ProcessedResponse(APIResponse):
"""Helper class for returning a ProcessedResponse object that contains information on the original, cached, or
reconstructed_response received and processed after retrieval from an API in addition to the cache key. This object
also allows storage of intermediate steps including:
1) parsed responses 2) extracted records and metadata 3) processed records (aliased as data) 4) any
additional messages An error field is provided for compatibility with the ErrorResponse class.
"""
parsed_response: Optional[Any] = None
extracted_records: Optional[List[Any]] = None
processed_records: Optional[List[Dict[Any, Any]]] = None
metadata: Optional[Any] = None
message: Optional[str] = None
@property
def data(self) -> Optional[List[Dict[Any, Any]]]:
"""Alias to the processed_records attribute that holds a list of dictionaries, when available."""
return self.processed_records
@property
def error(self) -> None:
"""Provided for type hinting + compatibility."""
return None
def __repr__(self) -> str:
"""Helper method for creating a simple representation of the ProcessedResponse."""
return (
f"ProcessedResponse(len={len(self.processed_records or [])}, "
f"cache_key={self.cache_key!r}, "
f"metadata={'{'+str(self.metadata)[1:40]+'...'+'}' if isinstance(self.metadata, (dict, list, str)) and self.metadata else self.metadata!r})"
)
def __len__(self) -> int:
"""Indicates the overall length of the processed data field as processed in the last step after filtering."""
return len(self.processed_records or [])
def __bool__(self) -> bool:
"""Returns true to indicate that processing was successful, independent of the number of processed records."""
return True
__all__ = ["APIResponse", "ProcessedResponse", "ErrorResponse", "NonResponse"]