Source code for scholar_flux.data_storage.redis_storage

# /data_storage/redis_storage.py
"""The scholar_flux.data_storage.redis_storage module implements the RedisStorage backend for the DataCacheManager.

This class implements the abstract methods required for compatibility with the scholar_flux.DataCacheManager.

This class implements caching by using the serialization-deserialization and caching features available in Redis
to store ProcessedResponse fields within the database for later CRUD operations.

WARNING: Ensure that the 'namespace' parameter is set to a non-empty, unique value for each logical cache.
Using an empty or shared namespace may result in accidental deletion or overwriting of unrelated data. For that reason,
the `delete_all` method does not perform any deletions unless a namespace exists

"""

from __future__ import annotations
from scholar_flux.exceptions import (
    RedisImportError,
    StorageCacheException,
    CacheRetrievalException,
    CacheUpdateException,
    CacheDeletionException,
    CacheVerificationException,
)
from scholar_flux.data_storage.abc_storage import ABCStorage
from scholar_flux.utils.encoder import JsonDataEncoder
from scholar_flux.utils import config_settings  # provides the loaded global environment configuration
from typing import Any, Dict, List, Optional, cast, TYPE_CHECKING

import logging
import threading

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    import redis
    from redis.exceptions import RedisError, ConnectionError, TimeoutError
else:
    try:
        import redis
        from redis.exceptions import RedisError, ConnectionError, TimeoutError
    except ImportError:
        redis = None
        RedisError = Exception
        TimeoutError = Exception
        ConnectionError = Exception


[docs] class RedisStorage(ABCStorage): """Implements the storage methods necessary to interact with Redis using a unified backend interface. The RedisStorage implements the abstract methods from the ABCStorage class for use with the DataCacheManager. This implementation is designed to use a key-value store as a cache by which data can be stored and retrieved in a relatively straightforward manner similar to the In-Memory Storage. Examples: >>> from scholar_flux.data_storage import RedisStorage # Defaults to connecting to locally (localhost) on the default port for Redis services (6379) # Verifies that a Redis service is locally available. >>> assert RedisStorage.is_available() >>> redis_storage = RedisStorage(namespace='testing_functionality') >>> print(redis_storage) # OUTPUT: RedisStorage(...) # Adding records to the storage >>> redis_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> redis_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) # Revising and overwriting a record >>> redis_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> redis_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> redis_storage.retrieve_all() # Will also be empty # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} >>> redis_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> redis_storage.delete_all() # deletes all records from the namespace >>> redis_storage.retrieve_keys() # Will now be empty >>> redis_storage.retrieve_all() # Will also be empty """ DEFAULT_NAMESPACE: str = "SFAPI" DEFAULT_CONFIG: dict = { "host": config_settings.config.get("SCHOLAR_FLUX_REDIS_HOST") or "localhost", "port": config_settings.config.get("SCHOLAR_FLUX_REDIS_PORT") or 6379, } DEFAULT_RAISE_ON_ERROR: bool = False
[docs] def __init__( self, host: Optional[str] = None, namespace: Optional[str] = None, ttl: Optional[int] = None, raise_on_error: Optional[bool] = None, **redis_config, ): """Initialize the Redis storage backend and connect to the Redis server. If no parameters are specified, the Redis storage will attempt to resolve the host and port using variables from the environment (loaded into scholar_flux.utils.config_settings at runtime). The resolved host and port are resolved from environment variables/defaults in the following order of priority: - SCHOLAR_FLUX_REDIS_HOST > REDIS_HOST > 'localhost' - SCHOLAR_FLUX_REDIS_PORT > REDIS_PORT > 6379 Args: host (Optional[str]): Redis server host. Can be provided positionally or as a keyword argument. Defaults to 'localhost' if not specified. namespace (Optional[str]): The prefix associated with each cache key. Defaults to DEFAULT_NAMESPACE if left `None`. ttl (Optional[int]): The total number of seconds that must elapse for a cache record to expire. If not provided, ttl defaults to None. raise_on_error (Optional[bool]): Determines whether an error should be raised when encountering unexpected issues when interacting with Redis. If `None`, the `raise_on_error` attribute defaults to `RedisStorage.DEFAULT_RAISE_ON_ERROR`. **redis_config (Optional[Dict[Any, Any]]): Configuration parameters required to connect to the Redis server. Typically includes parameters such as host, port, db, etc. Raises: RedisImportError: If redis module is not available or fails to load. """ super().__init__() # optional dependencies set to None if not available if redis is None: raise RedisImportError self.config: dict = self.DEFAULT_CONFIG | redis_config if host: self.config["host"] = host self.client = redis.Redis(**self.config) # Only override the defaults if available and the namespace/raise_on_error parameters are not directly provided self.namespace = self.DEFAULT_NAMESPACE if self.DEFAULT_NAMESPACE and not namespace else namespace self.raise_on_error = raise_on_error if raise_on_error is not None else self.DEFAULT_RAISE_ON_ERROR # catches all None and non-empty strings self._validate_prefix(self.namespace, required=True) self.ttl = ttl self.lock = threading.Lock() logger.info("RedisClient initialized and connected.")
[docs] def clone(self) -> RedisStorage: """Helper method for creating a new RedisStorage with the same parameters. Note that the implementation of the RedisStorage is not able to be deep copied, and this method is provided for convenience in re-instantiation with the same configuration. """ cls = self.__class__ return cls(namespace=self.namespace, ttl=self.ttl, **self.config)
[docs] def retrieve(self, key: str) -> Optional[Any]: """Retrieve the value associated with the provided key from cache. Args: key (str): The key used to fetch the stored data from cache. Returns: Any: The value returned is deserialized JSON object if successful. Returns None if the key does not exist. """ try: namespace_key = self._prefix(key) with self.lock: cache_data = cast("Optional[str]", self.client.get(namespace_key)) if cache_data is None: logger.info(f"Record for key {key} (namespace = '{self.namespace}') not found...") return None if isinstance(cache_data, bytes): cache_data = cache_data.decode() return JsonDataEncoder.deserialize(cache_data) except RedisError as e: msg = f"Error during attempted retrieval of key {key} (namespace = '{self.namespace}'): {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheRetrievalException if self.raise_on_error else None, msg=msg ) return None
[docs] def retrieve_all(self) -> Dict[str, Any]: """Retrieve all records from cache that match the current namespace prefix. Returns: dict: Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects. Raises: RedisError: If there is an error during the retrieval of records under the namespace """ try: matched_keys = self.retrieve_keys() results = {key: self.retrieve(key) for key in matched_keys} return results except RedisError as e: msg = f"Error during attempted retrieval of records from namespace '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheRetrievalException if self.raise_on_error else None, msg=msg ) return {}
[docs] def retrieve_keys(self) -> List[str]: """Retrieve all keys for records from cache that match the current namespace prefix. Returns: list: A list of all keys saved under the current namespace. Raises: RedisError: If there is an error retrieving the record key """ keys = [] try: with self.lock: keys = [ key.decode() if isinstance(key, bytes) else key for key in self.client.scan_iter(f"{self.namespace}:*") ] except RedisError as e: msg = f"Error during attempted retrieval of all keys from namespace '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheRetrievalException if self.raise_on_error else None, msg=msg ) return keys
[docs] def update(self, key: str, data: Any) -> None: """Update the cache by storing associated value with provided key. Args: key (str): The key used to store the serialized JSON string in cache. data (Any): A Python object that will be serialized into JSON format and stored. This includes standard data types like strings, numbers, lists, dictionaries, etc. Raises: Redis: If an error occur when attempting to insert or update a record """ try: with self.lock: namespace_key = self._prefix(key) self.client.set(namespace_key, JsonDataEncoder.serialize(data)) if self.ttl is not None: self.client.expire(namespace_key, self.ttl) logger.debug(f"Cache updated for key: '{namespace_key}'") except RedisError as e: msg = f"Error during attempted update of key {key} (namespace = '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheUpdateException if self.raise_on_error else None, msg=msg )
[docs] def delete(self, key: str) -> None: """Delete the value associated with the provided key from cache. Args: key (str): The key used associated with the stored data from cache. Raises: RedisError: If there is an error deleting the record """ try: namespace_key = self._prefix(key) with self.with_raise_on_error(): cached = self.verify_cache(key) if cached: with self.lock: self.client.delete(namespace_key) else: logger.info(f"Record for key {key} (namespace = '{self.namespace}') does not exist") except (RedisError, StorageCacheException) as e: msg = f"Error during attempted deletion of key {key} (namespace = '{self.namespace}'): {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheDeletionException if self.raise_on_error else None, msg=msg )
[docs] def delete_all(self) -> None: """Delete all records from cache that match the current namespace prefix. Raises: RedisError: If there an error occurred when deleting records from the collection """ # this function requires a namespace to avoid deleting unrelated data try: if not self.namespace: logger.warning( "For safety purposes, the RedisStorage will not delete any records in the absence " "of a namespace. Skipping..." ) return None with self.lock: matched_keys = list(self.client.scan_iter(f"{self.namespace}:*")) for key in matched_keys: self.client.delete(key) except RedisError as e: msg = f"Error during attempted deletion of all records from namespace '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheDeletionException if self.raise_on_error else None, msg=msg )
[docs] def verify_cache(self, key: str) -> bool: """Check if specific cache key exists. Args: key (str): The key to check its presence in the Redis storage backend. Returns: bool: True if the key is found otherwise False. Raises: ValueError: If provided key is empty or None. RedisError: If an error occurs when looking up a key """ try: if not key or not isinstance(key, str): raise ValueError(f"Key invalid. Received {key} (namespace = '{self.namespace}')") namespace_key = self._prefix(key) with self.lock: if self.client.exists(namespace_key): return True except (RedisError, StorageCacheException) as e: msg = f"Error during the verification of the existence of key {key} (namespace = '{self.namespace}'): {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheVerificationException if self.raise_on_error else None, msg=msg, ) return False
[docs] @classmethod def is_available(cls, host: Optional[str] = None, port: Optional[int] = None, verbose: bool = True) -> bool: """Helper class method for testing whether the Redis service is available and can be accessed. If Redis can be successfully reached, this function returns True, otherwise False. Args: host (Optional[str]): Indicates the location to attempt a connection. If None or an empty string, Defaults to localhost (the local computer) or the "host" entry from the class variable, DEFAULT_CONFIG. port (Optional[int]): Indicates the port where the service can be accessed If None or 0, Defaults to port 6379 or the "port" entry from the DEFAULT_CONFIG class variable. verbose (bool): Indicates whether to log at the levels, DEBUG and lower, or to log warnings only Raises: TimeoutError: If a timeout error occurs when attempting to ping Redis ConnectionError: If a connection cannot be established """ if redis is None: logger.warning("The redis module is not available") return False redis_host = host or cls.DEFAULT_CONFIG["host"] redis_port = port or cls.DEFAULT_CONFIG["port"] try: with redis.Redis(host=redis_host, port=redis_port, socket_connect_timeout=1) as client: client.ping() if verbose: logger.info(f"The Redis service is available at {redis_host}:{redis_port}") return True except (TimeoutError, ConnectionError) as e: logger.warning(f"An active Redis service could not be found at {redis_host}:{redis_port}: {e}") return False
__all__ = ["RedisStorage"]