Source code for scholar_flux.data_storage.abc_storage

# /data_storage/abc_storage.py
"""The scholar_flux.data_storage.abc_storage module implements the ABCStorage that defines the abstractions that are to
be implemented to create a scholar_flux compatible storage. The ABCStorage defines basic CRUD operations and convenience
methods used to perform operations on the entire range of cached records, or optionally, cached records specific to a
namespace.

scholar_flux implements the ABCStorage with subclasses for SQLite (through SQLAlchemy), Redis, MongoDB, and In-Memory
cache and can be further extended to duckdb and other abstractions supported by SQLAlchemy.

"""
from collections.abc import Iterator
from typing import Any, Optional
from typing_extensions import Self
from abc import ABC, abstractmethod
from contextlib import contextmanager
from scholar_flux.utils.repr_utils import generate_repr
from scholar_flux.utils.helpers import coerce_numeric
from scholar_flux.exceptions.storage_exceptions import CacheParameterValidationException
from scholar_flux import masker

import logging

logger = logging.getLogger(__name__)


[docs] class ABCStorage(ABC): """The ABCStorage class provides the basic structure required to implement the data storage cache with customized backend. This subclass provides methods to check the cache, delete from the cache, update the cache with new data, and retrieve data from the cache storage. """
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None: """Initializes the current storage implementation.""" self.namespace: Optional[str] = None self.raise_on_error: bool = False self.config: dict[str, Any] = {} self.ttl: Any = None
def _initialize(self, *args: Any, **kwargs: Any) -> None: """Optional base method to implement for initializing/reinitializing connections.""" pass
[docs] @classmethod def get_default_config(cls) -> dict: """Get default configuration with current config_settings values.""" return {}
def __deepcopy__(self, memo: Optional[dict[int, Any]]) -> Self: """Future implementations of ABCStorage devices are unlikely to be deep-copied. This method defines the error message that will be used by default upon failures. """ class_name = self.__class__.__name__ raise NotImplementedError( f"{class_name} cannot be deep-copied. Use the .clone() method to create a new instance with " "the same configuration." )
[docs] @abstractmethod def retrieve(self, *args: Any, **kwargs: Any) -> Optional[Any]: """Core method for retrieving a page of records from the cache.""" raise NotImplementedError
[docs] @abstractmethod def retrieve_all(self, *args: Any, **kwargs: Any) -> Optional[dict[str, Any]]: """Core method for retrieving all pages of records from the cache.""" raise NotImplementedError
[docs] @abstractmethod def retrieve_keys(self, *args: Any, **kwargs: Any) -> Optional[list[str]]: """Core method for retrieving all keys from the cache.""" raise NotImplementedError
[docs] @abstractmethod def update(self, *args: Any, **kwargs: Any) -> None: """Core method for updating the cache with new records.""" raise NotImplementedError
[docs] @abstractmethod def delete(self, *args: Any, **kwargs: Any) -> Optional[bool]: """Core method for record deletion. Should return True when successful, False otherwise, and `None` on error. """ raise NotImplementedError
[docs] @abstractmethod def delete_all(self, *args: Any, **kwargs: Any) -> None: """Core method for deleting all pages of records from the cache.""" raise NotImplementedError
[docs] @abstractmethod def verify_cache(self, *args: Any, **kwargs: Any) -> bool: """Core method for verifying the cache based on the key.""" raise NotImplementedError
[docs] @classmethod @abstractmethod def is_available(cls, *args: Any, **kwargs: Any) -> bool: """Core method for verifying whether a storage/service is available.""" raise NotImplementedError
[docs] @abstractmethod def clone(self) -> Self: """Helper method for cloning the structure and configuration of future implementations.""" raise NotImplementedError
[docs] @abstractmethod def verify_connection(self) -> None: """Verifies that the storage is available for connection with initialized storage configuration settings.""" raise NotImplementedError()
[docs] @classmethod def ping(cls, *args: Any, **kwargs: Any) -> None: """Verifies that a connection to the storage implementation can be established successfully. This is a no-op by default for storage backends that don't require external connections (e.g., InMemoryStorage, NullStorage). Storage backends connecting to external services (Redis, MongoDB, SQL) should override this method to perform actual connection checks. Note: The signature and arguments vary by storage implementation: - Redis: ping(client: redis.Redis) - MongoDB: ping(client: MongoClient) - SQL: ping(engine: Engine) - InMemory/Null: ping() (no-op, uses default) """ pass
def _prefix(self, key: str) -> str: """Prefixes a namespace to the given `key`: This method is useful for when you are using a single redis/mongodb server and also need to retrieve a subset of articles for a particular task. Args: key (str): The key to prefix with a namespace (Ex. CORE_PUBLICATIONS) Returns: str: The cache key prefixed with a namespace (Ex. f'CORE_PUBLICATIONS:{key}') Raises: KeyError: If the received key is not a valid, non-empty string """ if not key: raise KeyError(f"No valid value provided for key {key}") if not self.namespace: return key return f"{self.namespace}:{key}" if not key.startswith(f"{self.namespace}:") else key @classmethod def _validate_prefix(cls, key: Optional[str], required: bool = False) -> bool: """Helper method for validating the current namespace key. Raises a KeyError if the key is not a string """ if (key is None or key == "") and not required: return True if key and isinstance(key, str): return True msg = f"A non-empty namespace string must be provided for the {cls.__name__}. " f"Received {type(key)}" logger.error(msg) raise KeyError(msg) @classmethod def _validate_ttl(cls, ttl: Optional[int | float | str] = None) -> Optional[int | float]: """Validates TTL values, converting them into floats when possible and not already a non-negative numeric value. Args: ttl (Optional[int | float | str]): The value to return as a non-negative integer or float when possible. None values are returned as is. Returns: Optional[int | float]: int: The original non-negative integer value if received float: A non-negative float, either received as such or converted from a string value None: TTLs are returned as None if None is received or the ttl is equal to -1 or -1.0 Raises: CacheParameterValidationException: for values that are not None and cannot be converted into a non-negative float. While negative numbers are invalid, `-1` is the exception, signifying that TTL-based cache expiration should not be used. """ if ttl is None or isinstance(ttl, (float, int)) and ttl >= 0: return ttl ttl_numeric = coerce_numeric(ttl) no_ttl = ttl == -1 or ttl_numeric == -1 if ttl_numeric is None or ttl_numeric < 0 and not no_ttl: class_name = cls.__name__ raise CacheParameterValidationException( f"The {class_name} expected the TTL to be a non-negative number, `None`, or -1 (no expiration), but " f"received an invalid value ({ttl})" ) return None if no_ttl else ttl_numeric @classmethod def _handle_storage_exception( cls, exception: BaseException, operation_exception_type: Optional[type[BaseException]] = None, msg: str = "" ) -> None: """Helper method for logging errors and raising a new exception if needed. Another exception is only raised if `operation_exception_type` is assigned an exception. If `None`, an exception is not raised. An error will be logged regardless, however. Args: exception (BaseException): The exception instance raised from the last storage cache operation operation_exception_type (Optional[type[BaseException]]): The exception to raise msg (str): The error message to log and/or raise. """ error_message = msg or str(exception) logger.error(error_message) if operation_exception_type is not None: raise operation_exception_type(error_message) from exception
[docs] @contextmanager def with_raise_on_error(self, value: bool = True) -> Iterator[None]: """Uses a context manager to temporarily modify the `raise_on_error` attribute for the context duration. All storage backends that inherit from the `ABCStorage` will also inherit the `with_raise_on_error` context manager. When used, this context manager temporarily sets the `raise_on_error` attribute to True or False for the duration of a code block without permanently changing the storage subclass's configuration. This context manager is most useful for briefly suppressing errors and in cache verification when errors need to be logged and reported instead of silently indicating that a cache entry couldn't be found. Args: value (bool): A value to temporarily assign to `raise_on_error` for the context duration Example: >>> with storage.with_raise_on_error(True): >>> # Any storage operation here will raise on error, regardless of the instance default >>> storage.retrieve(key) """ original_value = self.raise_on_error self.raise_on_error = value try: yield finally: self.raise_on_error = original_value
[docs] @contextmanager def with_namespace(self, value: str) -> Iterator[None]: """Uses a context manager to temporarily modify the `namespace` attribute for the context duration.""" original_value = self.namespace self.namespace = value try: yield finally: self.namespace = original_value
[docs] def structure(self, flatten: bool = False, show_value_attributes: bool = True, mask_values: bool = True) -> str: """Helper method for quickly showing a representation of the overall structure of the current storage subclass. The instance uses the generate_repr helper function to produce human-readable representations of the core structure of the storage subclass with its defaults. Args: flatten (bool): Flag indicating to flatten the string representation of the object into a single line when True and to preserve a multiline representation of the storage when False (default). show_value_attributes (bool): Flag for hiding the internal attributes of nested attributes when True (arguments replaced with `...`) and showing their default representation when False. mask_values (bool): Masks any potentially sensitive data shown in the representation when True (default) and shows the representation without sensitive data masking when False. Returns: str: The structure of the current storage subclass as a string. """ representation = generate_repr(self, flatten=flatten, show_value_attributes=show_value_attributes) return masker.mask_text(representation) if mask_values else representation
def __repr__(self) -> str: """Method for identifying the current implementation and subclasses of the BaseStorage. Useful for showing the options being used to store and retrieve data stored as cache. """ return self.structure()
__all__ = ["ABCStorage"]