# /data_storage/abc_storage.py
"""The scholar_flux.data_storage.abc_storage module implements the ABCStorage that defines the abstractions that are to
be implemented to create a scholar_flux compatible storage. The ABCStorage defines basic CRUD operations and convenience
methods used to perform operations on the entire range of cached records, or optionally, cached records specific to a
namespace.
scholar_flux implements the ABCStorage with subclasses for SQLite (through SQLAlchemy), Redis, MongoDB, and In-Memory
cache and can be further extended to duckdb and other abstractions supported by SQLAlchemy.
"""
from collections.abc import Iterator
from typing import Any, Optional
from typing_extensions import Self
from abc import ABC, abstractmethod
from contextlib import contextmanager
from scholar_flux.utils.repr_utils import generate_repr
from scholar_flux.utils.helpers import coerce_numeric
from scholar_flux.exceptions.storage_exceptions import CacheParameterValidationException
from scholar_flux import masker
import logging
logger = logging.getLogger(__name__)
[docs]
class ABCStorage(ABC):
"""The ABCStorage class provides the basic structure required to implement the data storage cache with customized
backend.
This subclass provides methods to check the cache, delete from the cache, update the cache with new data, and
retrieve data from the cache storage.
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initializes the current storage implementation."""
self.namespace: Optional[str] = None
self.raise_on_error: bool = False
self.config: dict[str, Any] = {}
self.ttl: Any = None
def _initialize(self, *args: Any, **kwargs: Any) -> None:
"""Optional base method to implement for initializing/reinitializing connections."""
pass
[docs]
@classmethod
def get_default_config(cls) -> dict:
"""Get default configuration with current config_settings values."""
return {}
def __deepcopy__(self, memo: Optional[dict[int, Any]]) -> Self:
"""Future implementations of ABCStorage devices are unlikely to be deep-copied.
This method defines the error message that will be used by default upon failures.
"""
class_name = self.__class__.__name__
raise NotImplementedError(
f"{class_name} cannot be deep-copied. Use the .clone() method to create a new instance with "
"the same configuration."
)
[docs]
@abstractmethod
def retrieve(self, *args: Any, **kwargs: Any) -> Optional[Any]:
"""Core method for retrieving a page of records from the cache."""
raise NotImplementedError
[docs]
@abstractmethod
def retrieve_all(self, *args: Any, **kwargs: Any) -> Optional[dict[str, Any]]:
"""Core method for retrieving all pages of records from the cache."""
raise NotImplementedError
[docs]
@abstractmethod
def retrieve_keys(self, *args: Any, **kwargs: Any) -> Optional[list[str]]:
"""Core method for retrieving all keys from the cache."""
raise NotImplementedError
[docs]
@abstractmethod
def update(self, *args: Any, **kwargs: Any) -> None:
"""Core method for updating the cache with new records."""
raise NotImplementedError
[docs]
@abstractmethod
def delete(self, *args: Any, **kwargs: Any) -> Optional[bool]:
"""Core method for record deletion.
Should return True when successful, False otherwise, and `None` on error.
"""
raise NotImplementedError
[docs]
@abstractmethod
def delete_all(self, *args: Any, **kwargs: Any) -> None:
"""Core method for deleting all pages of records from the cache."""
raise NotImplementedError
[docs]
@abstractmethod
def verify_cache(self, *args: Any, **kwargs: Any) -> bool:
"""Core method for verifying the cache based on the key."""
raise NotImplementedError
[docs]
@classmethod
@abstractmethod
def is_available(cls, *args: Any, **kwargs: Any) -> bool:
"""Core method for verifying whether a storage/service is available."""
raise NotImplementedError
[docs]
@abstractmethod
def clone(self) -> Self:
"""Helper method for cloning the structure and configuration of future implementations."""
raise NotImplementedError
[docs]
@abstractmethod
def verify_connection(self) -> None:
"""Verifies that the storage is available for connection with initialized storage configuration settings."""
raise NotImplementedError()
[docs]
@classmethod
def ping(cls, *args: Any, **kwargs: Any) -> None:
"""Verifies that a connection to the storage implementation can be established successfully.
This is a no-op by default for storage backends that don't require external connections
(e.g., InMemoryStorage, NullStorage). Storage backends connecting to external services
(Redis, MongoDB, SQL) should override this method to perform actual connection checks.
Note:
The signature and arguments vary by storage implementation:
- Redis: ping(client: redis.Redis)
- MongoDB: ping(client: MongoClient)
- SQL: ping(engine: Engine)
- InMemory/Null: ping() (no-op, uses default)
"""
pass
def _prefix(self, key: str) -> str:
"""Prefixes a namespace to the given `key`:
This method is useful for when you are using a single redis/mongodb server
and also need to retrieve a subset of articles for a particular task.
Args:
key (str): The key to prefix with a namespace (Ex. CORE_PUBLICATIONS)
Returns:
str: The cache key prefixed with a namespace (Ex. f'CORE_PUBLICATIONS:{key}')
Raises:
KeyError: If the received key is not a valid, non-empty string
"""
if not key:
raise KeyError(f"No valid value provided for key {key}")
if not self.namespace:
return key
return f"{self.namespace}:{key}" if not key.startswith(f"{self.namespace}:") else key
@classmethod
def _validate_prefix(cls, key: Optional[str], required: bool = False) -> bool:
"""Helper method for validating the current namespace key.
Raises a KeyError if the key is not a string
"""
if (key is None or key == "") and not required:
return True
if key and isinstance(key, str):
return True
msg = f"A non-empty namespace string must be provided for the {cls.__name__}. " f"Received {type(key)}"
logger.error(msg)
raise KeyError(msg)
@classmethod
def _validate_ttl(cls, ttl: Optional[int | float | str] = None) -> Optional[int | float]:
"""Validates TTL values, converting them into floats when possible and not already a non-negative numeric value.
Args:
ttl (Optional[int | float | str]):
The value to return as a non-negative integer or float when possible. None values are returned as is.
Returns:
Optional[int | float]:
int: The original non-negative integer value if received
float: A non-negative float, either received as such or converted from a string value
None: TTLs are returned as None if None is received or the ttl is equal to -1 or -1.0
Raises:
CacheParameterValidationException: for values that are not None and cannot be converted into a non-negative
float. While negative numbers are invalid, `-1` is the exception, signifying that TTL-based cache expiration
should not be used.
"""
if ttl is None or isinstance(ttl, (float, int)) and ttl >= 0:
return ttl
ttl_numeric = coerce_numeric(ttl)
no_ttl = ttl == -1 or ttl_numeric == -1
if ttl_numeric is None or ttl_numeric < 0 and not no_ttl:
class_name = cls.__name__
raise CacheParameterValidationException(
f"The {class_name} expected the TTL to be a non-negative number, `None`, or -1 (no expiration), but "
f"received an invalid value ({ttl})"
)
return None if no_ttl else ttl_numeric
@classmethod
def _handle_storage_exception(
cls, exception: BaseException, operation_exception_type: Optional[type[BaseException]] = None, msg: str = ""
) -> None:
"""Helper method for logging errors and raising a new exception if needed.
Another exception is only raised if `operation_exception_type` is assigned an exception. If `None`,
an exception is not raised. An error will be logged regardless, however.
Args:
exception (BaseException): The exception instance raised from the last storage cache operation
operation_exception_type (Optional[type[BaseException]]): The exception to raise
msg (str): The error message to log and/or raise.
"""
error_message = msg or str(exception)
logger.error(error_message)
if operation_exception_type is not None:
raise operation_exception_type(error_message) from exception
[docs]
@contextmanager
def with_raise_on_error(self, value: bool = True) -> Iterator[None]:
"""Uses a context manager to temporarily modify the `raise_on_error` attribute for the context duration.
All storage backends that inherit from the `ABCStorage` will also inherit the `with_raise_on_error` context
manager. When used, this context manager temporarily sets the `raise_on_error` attribute to True or False for
the duration of a code block without permanently changing the storage subclass's configuration.
This context manager is most useful for briefly suppressing errors and in cache verification when errors
need to be logged and reported instead of silently indicating that a cache entry couldn't be found.
Args:
value (bool): A value to temporarily assign to `raise_on_error` for the context duration
Example:
>>> with storage.with_raise_on_error(True):
>>> # Any storage operation here will raise on error, regardless of the instance default
>>> storage.retrieve(key)
"""
original_value = self.raise_on_error
self.raise_on_error = value
try:
yield
finally:
self.raise_on_error = original_value
[docs]
@contextmanager
def with_namespace(self, value: str) -> Iterator[None]:
"""Uses a context manager to temporarily modify the `namespace` attribute for the context duration."""
original_value = self.namespace
self.namespace = value
try:
yield
finally:
self.namespace = original_value
[docs]
def structure(self, flatten: bool = False, show_value_attributes: bool = True, mask_values: bool = True) -> str:
"""Helper method for quickly showing a representation of the overall structure of the current storage subclass.
The instance uses the generate_repr helper function to produce human-readable representations of the core
structure of the storage subclass with its defaults.
Args:
flatten (bool):
Flag indicating to flatten the string representation of the object into a single line when True and to
preserve a multiline representation of the storage when False (default).
show_value_attributes (bool):
Flag for hiding the internal attributes of nested attributes when True (arguments replaced with `...`)
and showing their default representation when False.
mask_values (bool):
Masks any potentially sensitive data shown in the representation when True (default) and shows the
representation without sensitive data masking when False.
Returns:
str: The structure of the current storage subclass as a string.
"""
representation = generate_repr(self, flatten=flatten, show_value_attributes=show_value_attributes)
return masker.mask_text(representation) if mask_values else representation
def __repr__(self) -> str:
"""Method for identifying the current implementation and subclasses of the BaseStorage.
Useful for showing the options being used to store and retrieve data stored as cache.
"""
return self.structure()
__all__ = ["ABCStorage"]