Source code for scholar_flux.data_storage.in_memory_storage

# /data_storage/in_memory_storage.py
"""The scholar_flux.data_storage.in_memory_storage module implements an InMemoryStorage class that implements a basic
cache storage with an in-memory dictionary.

The InMemoryStorage class implements the basic CRUD operations and convenience methods used to perform operations.

"""

from __future__ import annotations
from typing import Any, List, Dict, Optional
import logging
import threading

logger = logging.getLogger(__name__)
from scholar_flux.data_storage.abc_storage import ABCStorage
from scholar_flux.utils.repr_utils import generate_repr_from_string


[docs] class InMemoryStorage(ABCStorage): """Default storage class that implements an in-memory storage cache using a dictionary. This class implements the required abstract methods from the ABCStorage base class to ensure compatibility with the scholar_flux.DataCacheManager. Methods are provided to delete from the cache, update the cache with new data, and retrieve data from the cache. Args: namespace (Optional[str]): Prefix for cache keys. Defaults to None. ttl (Optional[int]): Ignored. Included for interface compatibility; not implemented. **kwargs (Dict): Ignored. Included for interface compatibility; not implemented. Examples: >>> from scholar_flux.data_storage import InMemoryStorage ### defaults to a basic dictionary: >>> memory_storage = InMemoryStorage(namespace='testing_functionality') >>> print(memory_storage) # OUTPUT: InMemoryStorage(...) ### Adding records to the storage >>> memory_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> memory_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) ### Revising and overwriting a record >>> memory_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> memory_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> memory_storage.retrieve_all() # Will also be empty # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} >>> memory_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> memory_storage.delete_all() # deletes all records from the namespace >>> memory_storage.retrieve_keys() # Will now be empty >>> memory_storage.retrieve_all() # Will also be empty """ # for compatibility with other storage backends DEFAULT_NAMESPACE: Optional[str] = None DEFAULT_RAISE_ON_ERROR: bool = False
[docs] def __init__( self, namespace: Optional[str] = None, ttl: Optional[int] = None, raise_on_error: Optional[bool] = None, **kwargs, ) -> None: """Initialize a basic, dictionary-like memory_cache using a namespace. Note that `ttl` and `**kwargs` are provided for interface compatibility, and specifying any of these as arguments will not affect processing or cache initialization. """ self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE if ttl is not None: logger.warning("The parameter, `ttl` is not enforced in InMemoryStorage. Skipping.") if raise_on_error is not None: logger.warning("The parameter, `raise_on_error` is not enforced in InMemoryStorage. Skipping.") self.ttl = None self.raise_on_error = False self.lock = threading.Lock() self._validate_prefix(namespace, required=False) self._initialize()
[docs] def clone(self) -> InMemoryStorage: """Helper method for creating a new InMemoryStorage with the same configuration.""" cls = self.__class__ storage = cls(namespace=self.namespace) with self.lock: storage.memory_cache = self.memory_cache.copy() return storage
def _initialize(self, **kwargs) -> None: """Initializes an empty memory cache if kwargs is empty. Otherwise initializes the dictionary Starting from the key-value mappings specified as key-value pairs. """ logger.debug("Initializing in-memory cache...") with self.lock: self.memory_cache: dict = {} | kwargs
[docs] def retrieve(self, key: str) -> Optional[Any]: """Attempts to retrieve a response containing the specified cache key within the current namespace. Args: key (str): The key used to fetch the stored data from cache. Returns: Any: The value returned is deserialized JSON object if successful. Returns None if the key does not exist. """ namespace_key = self._prefix(key) with self.lock: return self.memory_cache.get(namespace_key)
[docs] def retrieve_all(self) -> Optional[Dict[str, Any]]: """Retrieves all cache key-response mappings found within the current namespace. Returns: A dictionary containing each key-value mapping for all cached data within the same namespace """ with self.lock: return {k: v for k, v in self.memory_cache.items() if not self.namespace or k.startswith(self.namespace)}
[docs] def retrieve_keys(self) -> Optional[List[str]]: """Retrieves the full list of all cache keys found within the current namespace. Returns: List[str]: The full list of all keys that are currently mapped within the storage """ with self.lock: return [key for key in self.memory_cache if not self.namespace or key.startswith(self.namespace)] or []
[docs] def update(self, key: str, data: Any) -> None: """Attempts to update the data associated with a specific cache key in the namespace. Args: key (str): The key of the key-value pair data (Any): The data to be associated with the key """ namespace_key = self._prefix(key) with self.lock: self.memory_cache[namespace_key] = data
[docs] def delete(self, key: str) -> None: """Attempts to delete the selected cache key if found within the current namespace. Args: key (str): The key used associated with the stored data from the dictionary cache. """ namespace_key = self._prefix(key) with self.lock: key = self.memory_cache.pop(namespace_key, None) if key is not None: logger.debug(f"Key: {key} deleted successfully") else: logger.info(f"Key: {key} (namespace = '{self.namespace}') does not exist in cache.")
[docs] def delete_all(self) -> None: """Attempts to delete all cache keys found within the current namespace.""" logger.debug("deleting all record within cache...") try: with self.lock: n = len(self.memory_cache) if not self.namespace: self.memory_cache.clear() else: filtered_cache = {k: v for k, v in self.memory_cache.items() if not k.startswith(self.namespace)} self.memory_cache.clear() self.memory_cache.update(filtered_cache) n -= len(filtered_cache) logger.debug(f"Deleted {n} records.") except Exception as e: logger.warning(f"An error occurred deleting e: {e}")
[docs] def verify_cache(self, key: str) -> bool: """Verifies whether a cache key exists the current namespace in the in-memory cache. Args: key (str): The key to lookup in the cache Returns: bool: True if the key is found otherwise False. """ namespace_key = self._prefix(key) with self.lock: return namespace_key in self.memory_cache
[docs] @classmethod def is_available(cls, *args, **kwargs) -> bool: """Helper method that returns True, indicating that dictionary-based storage will always be available. Returns: (bool): True to indicate that the dictionary-base cache storage will always be available """ return True
[docs] def structure(self, flatten: bool = False, show_value_attributes: bool = True) -> str: """Helper method for creating an in-memory cache without overloading the representation with the specifics of what is being cached.""" class_name = self.__class__.__name__ str_memory_cache = f"dict(n={len(self.memory_cache)})" class_attribute_dict = dict(namespace=self.namespace, memory_cache=str_memory_cache) return generate_repr_from_string( class_name, attribute_dict=class_attribute_dict, flatten=flatten, show_value_attributes=show_value_attributes, )
__all__ = ["InMemoryStorage"]