Source code for scholar_flux.data_storage.in_memory_storage

# /data_storage/in_memory_storage.py
"""The scholar_flux.data_storage.in_memory_storage module implements an InMemoryStorage class that implements a basic
cache storage with an in-memory dictionary.

The InMemoryStorage class implements the basic CRUD operations and convenience methods used to perform operations.

"""

from __future__ import annotations
from typing import Any, Optional
import logging
import threading

logger = logging.getLogger(__name__)
from scholar_flux.data_storage.abc_storage import ABCStorage
from scholar_flux.utils.repr_utils import generate_repr_from_string
from scholar_flux import masker


[docs] class InMemoryStorage(ABCStorage): """Default storage class that implements an in-memory storage cache using a dictionary. This class implements the required abstract methods from the ABCStorage base class to ensure compatibility with the scholar_flux.DataCacheManager. Methods are provided to delete from the cache, update the cache with new data, and retrieve data from the cache. Args: namespace (Optional[str]): Prefix for cache keys. Defaults to None. ttl (Optional[int]): Ignored. Included for interface compatibility; not implemented. **kwargs: Ignored. Included for interface compatibility; not implemented. Examples: >>> from scholar_flux.data_storage import InMemoryStorage ### defaults to a basic dictionary: >>> memory_storage = InMemoryStorage(namespace='testing_functionality') >>> print(memory_storage) # OUTPUT: InMemoryStorage(...) ### Adding records to the storage >>> memory_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> memory_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) ### Revising and overwriting a record >>> memory_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> memory_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> memory_storage.retrieve_all() # Will also be empty # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} >>> memory_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> memory_storage.delete_all() # deletes all records from the namespace >>> memory_storage.retrieve_keys() # Will now be empty >>> memory_storage.retrieve_all() # Will also be empty """ # for compatibility with other storage backends DEFAULT_NAMESPACE: Optional[str] = None DEFAULT_RAISE_ON_ERROR: bool = False STORAGE_TYPE: str = "InMemory"
[docs] def __init__( self, namespace: Optional[str] = None, ttl: Optional[int] = None, raise_on_error: Optional[bool] = None, **kwargs: Any, ) -> None: """Initialize a basic, dictionary-like memory_cache using a namespace. Note that `ttl` and `**kwargs` are provided for interface compatibility, and specifying any of these as arguments will not affect processing or cache initialization. """ self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE if ttl is not None: logger.warning("The parameter, `ttl` is not enforced in InMemoryStorage. Skipping.") if raise_on_error is not None: logger.warning("The parameter, `raise_on_error` is not enforced in InMemoryStorage. Skipping.") self.ttl = None self.raise_on_error = False self.config = {} self.lock = threading.Lock() self._validate_prefix(namespace, required=False) self._initialize()
[docs] def clone(self) -> InMemoryStorage: """Helper method for creating a new InMemoryStorage with the same configuration.""" cls = self.__class__ storage = cls(namespace=self.namespace) with self.lock: storage.memory_cache = self.memory_cache.copy() return storage
def _initialize(self, **kwargs: Any) -> None: """Initializes an empty memory cache if kwargs is empty. Otherwise initializes the dictionary starting from the key-value mappings specified as key-value pairs. """ logger.debug("Initializing in-memory cache...") with self.lock: self.memory_cache: dict[str, Any] = {} | kwargs
[docs] def retrieve(self, key: str) -> Optional[Any]: """Attempts to retrieve a response containing the specified cache key within the current namespace. Args: key (str): The key used to fetch the stored data from cache. Returns: Any: The value returned is deserialized JSON object if successful. Returns None if the key does not exist. """ namespace_key = self._prefix(key) with self.lock: return self.memory_cache.get(namespace_key)
[docs] def retrieve_all(self) -> Optional[dict[str, Any]]: """Retrieves all cache key-response mappings found within the current namespace. Returns: dict: A dictionary containing each key-value mapping for all cached data within the same namespace """ with self.lock: return {k: v for k, v in self.memory_cache.items() if not self.namespace or k.startswith(self.namespace)}
[docs] def retrieve_keys(self) -> list[str]: """Retrieves the full list of all cache keys found within the current namespace. Returns: list[str]: The full list of all keys that are currently mapped within the storage """ with self.lock: return [key for key in self.memory_cache if not self.namespace or key.startswith(self.namespace)] or []
[docs] def update(self, key: str, data: Any) -> None: """Attempts to update the data associated with a specific cache key in the namespace. Args: key (str): The key of the key-value pair data (Any): The data to be associated with the key """ namespace_key = self._prefix(key) with self.lock: self.memory_cache[namespace_key] = data
[docs] def delete(self, key: str) -> Optional[bool]: """Attempts to delete the selected cache key if found within the current namespace. Args: key (str): The key used associated with the stored data from the dictionary cache. """ namespace_key = self._prefix(key) with self.lock: if namespace_key in self.memory_cache: del self.memory_cache[namespace_key] logger.debug(f"Key: {key} (namespace = '{self.namespace}') successfully deleted") return True logger.info(f"Record for key {key} (namespace = '{self.namespace}') does not exist") return False
[docs] def delete_all(self) -> None: """Attempts to delete all cache keys found within the current namespace.""" logger.debug("deleting all record within cache...") try: with self.lock: n = len(self.memory_cache) if not self.namespace: self.memory_cache.clear() else: filtered_cache = {k: v for k, v in self.memory_cache.items() if not k.startswith(self.namespace)} self.memory_cache.clear() self.memory_cache.update(filtered_cache) n -= len(filtered_cache) logger.debug(f"Deleted {n} records.") except Exception as e: logger.warning(f"An error occurred deleting e: {e}")
[docs] def verify_cache(self, key: str) -> bool: """Verifies whether a cache key exists within the current namespace in the in-memory cache. Args: key (str): The key to lookup in the cache Returns: bool: True if the key is found otherwise False. """ namespace_key = self._prefix(key) with self.lock: return namespace_key in self.memory_cache
[docs] def verify_connection(self) -> None: """No-Op that otherwise raises an error when connections can't be established successfully.""" pass
[docs] @classmethod def is_available(cls, *args: Any, **kwargs: Any) -> bool: """Helper method that returns True, indicating that dictionary-based storage will always be available. Returns: (bool): True to indicate that the dictionary-base cache storage will always be available """ return True
[docs] def structure(self, flatten: bool = False, show_value_attributes: bool = True, mask_values: bool = False) -> str: """Creates a concise string representation of the current `InMemoryStorage` device. The representation displays the total number of records that have been registered to avoid overloading the representation with the specifics of what is being cached. Args: flatten (bool): Flag indicating whether to flatten the string representation of the object into a single line when True or preserve the multiline representation of the storage cache when False (default). show_value_attributes (bool): Flag for hiding the internal attributes of nested objects when True (arguments replaced with `...`) and showing their default representation when False (default). mask_values (bool): Masks any potentially sensitive data shown in the representation when True. This is false by default, as the representation of the `InMemoryStorage` displays non-sensitive information, including only the namespace of the cache and the total cached record count. Returns: A basic string representation of the current object. """ class_name = self.__class__.__name__ str_memory_cache = f"dict(n={len(self.memory_cache)})" class_attribute_dict = dict(namespace=self.namespace, memory_cache=str_memory_cache) representation = generate_repr_from_string( class_name, attribute_dict=class_attribute_dict, flatten=flatten, show_value_attributes=show_value_attributes, ) return masker.mask_text(representation) if mask_values else representation
__all__ = ["InMemoryStorage"]