Source code for scholar_flux.data_storage.sql_storage

# /data_storage/sql_storage.py
"""The scholar_flux.data_storage.sql_storage module implements the SQLAlchemyStorage class that implements the abstract
methods required for compatibility with the DataCacheManager in the scholar_flux package.

This class implements caching by recording each of the fields of a ProcessedResponse into and parsed fields into a
recursively encoded and serialized JSON data structure. When retrieving the data, the data is then decoded and
deserialized to return the original object.

Classes:
    - CacheTable:
        Defines the internal specification of the SQLAlchemy table that is used under the hood. This class inherits
        from Base/DeclarativeBase subclass to define its structure and function as a SQLAlchemy table
    - SQLCacheStorage:
        Inherits from the scholar_flux.data_storage.abc_storage subclass and Defines the mechanisms by which the
        storage uses SQLAlchemy to load, retrieve, and update, and delete data.

"""
from __future__ import annotations
import logging
from typing import Any, List, Dict, Optional, TYPE_CHECKING

from scholar_flux.utils.encoder import JsonDataEncoder
from scholar_flux.data_storage.abc_storage import ABCStorage
from scholar_flux.package_metadata import get_default_writable_directory
from scholar_flux.exceptions import (
    SQLAlchemyImportError,
    StorageCacheException,
    CacheRetrievalException,
    CacheUpdateException,
    CacheDeletionException,
    CacheVerificationException,
)

import cattrs
import threading

logger = logging.getLogger(__name__)

# SQLAlchemy import logic for type checking and runtime
if TYPE_CHECKING:
    import sqlalchemy
    from sqlalchemy import create_engine, Column, String, Integer, JSON, exc
    from sqlalchemy.orm import DeclarativeBase, sessionmaker
else:
    try:
        import sqlalchemy  # imported for consistent implementation with redis/pymongo, etc.
        from sqlalchemy import create_engine, Column, String, Integer, JSON, exc
        from sqlalchemy.orm import DeclarativeBase, sessionmaker

    except ImportError:
        # Dummies for names so code still parses, but using stubs or Nones for runtime
        create_engine = None

        def Column(*args, **kwargs):
            """Placeholder function that returned when the sqlalchemy package is not available."""
            pass

        String = Integer = JSON = exc = None
        DeclarativeBase = object  # type: ignore
        sessionmaker = None
        sqlalchemy = None

# Define ORM classes if SQLAlchemy is available or for type checking
if TYPE_CHECKING or sqlalchemy is not None:

    class Base(DeclarativeBase):
        """Helper class from which future SQL tables can be defined from."""

        pass

    class CacheTable(Base):
        """Table that implements caching in a manner similar to a dictionary with key-cache data pairs."""

        __tablename__ = "cache"
        id = Column(Integer, primary_key=True, autoincrement=True)
        key = Column(String, unique=True, nullable=False)
        cache = Column(JSON, nullable=False)

else:
    # Runtime stubs so code can be parsed, but will error if actually used
    Base = None  # type: ignore
    CacheTable = None  # type: ignore


[docs] class SQLAlchemyStorage(ABCStorage): """Implements the storage methods necessary to interact with SQLite3 in addition to other SQL flavors via sqlalchemy. This implementation is designed to use a relational database as a cache by which data can be stored and retrieved in a relatively straightforward manner that associates records in key-value pairs similar to the In-Memory Storage. **Note**: This table uses the structure previously defined in the CacheTable to store records in a structured manner: ID: Automatically generated - identifies the unique record in the table Key: Is used to associate a specific cached record with a short human-readable (or hashed) string Cache: The JSON data associated with the record. To store the data, any nested, non-serializable data is first encoded before being unstructured and stored. On retrieving the data, the JSON string is decoded and restructured in order to return the original object. The SQLAlchemyStorage can be initialized as follows: ### Import the package and initialize the storage in a dedicated package directory : >>> from scholar_flux.data_storage import SQLAlchemyStorage # Defaults to connecting to creating a local, file-based sqlite cache within the default writable directory. # Verifies that the dependency for a basic sqlite service is actually available for use locally >>> assert SQLAlchemyStorage.is_available() >>> sql_storage = SQLAlchemyStorage(namespace='testing_functionality') >>> print(sql_storage) # OUTPUT: SQLAlchemyStorage(...) # Adding records to the storage >>> sql_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> sql_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) # Revising and overwriting a record >>> sql_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> sql_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace >>> sql_storage.retrieve_all() # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> sql_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> sql_storage.delete_all() # deletes all records from the namespace >>> sql_storage.retrieve_keys() # Will now be empty """ DEFAULT_NAMESPACE: Optional[str] = None DEFAULT_CONFIG: Dict[str, Any] = { "url": lambda: "sqlite:///" + str(get_default_writable_directory("package_cache") / "data_store.sqlite"), "echo": False, } DEFAULT_RAISE_ON_ERROR: bool = False
[docs] def __init__( self, url: Optional[str] = None, namespace: Optional[str] = None, ttl: None = None, raise_on_error: Optional[bool] = False, **sqlalchemy_config, ) -> None: """Initialize the SQLAlchemy storage backend and connect to the server indicated via the `url` parameter. This class uses the innate flexibility of SQLAlchemy to support backends such as SQLite, Postgres, DuckDB, etc. Args: url (Optional[str]): Database connection string. This can be provided positionally or as a keyword argument. namespace (Optional[str]): The prefix associated with each cache key. By default, this is None. ttl (None): Ignored. Included for interface compatibility; not implemented. raise_on_error (Optional[bool]): Determines whether an error should be raised when encountering unexpected issues when interacting with SQLAlchemy. If `None`, the `raise_on_error` attribute defaults to `SQLAlchemyStorage.DEFAULT_RAISE_ON_ERROR`. **sqlalchemy_config: Additional SQLAlchemy engine/session options passed to sqlalchemy.create_engine Typical parameters include the following: - url (str): Indicates what server to connect to. Defaults to sqlite in the package directory. - echo (bool): Indicates whether to show the executed SQL queries in the console. """ # optional dependencies set to None if not available if sqlalchemy is None: raise SQLAlchemyImportError sqlalchemy_config["url"] = url or self.DEFAULT_CONFIG["url"]() sqlalchemy_config["echo"] = ( sqlalchemy_config.get("echo") if isinstance(sqlalchemy_config.get("echo"), bool) else self.DEFAULT_CONFIG["echo"] ) self.config: dict = sqlalchemy_config self.engine = create_engine(**self.config) Base.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) self.converter = cattrs.Converter() self.namespace = namespace or self.DEFAULT_NAMESPACE self.raise_on_error = raise_on_error if raise_on_error is not None else self.DEFAULT_RAISE_ON_ERROR self.lock = threading.Lock() if ttl: logger.warning("TTL is not enabled for SQLAlchemyStorage. Skipping") self.ttl = None self._validate_prefix(self.namespace, required=False)
[docs] def clone(self) -> SQLAlchemyStorage: """Helper method for creating a new SQLAlchemyStorage with the same parameters. Note that the implementation of the SQLAlchemyStorage is not able to be deep copied, and this method is provided for convenience in re-instantiation with the same configuration. """ cls = self.__class__ return cls(namespace=self.namespace, ttl=self.ttl, **self.config)
[docs] def retrieve(self, key: str) -> Optional[Any]: """Retrieve the value associated with the provided key from cache. Args: key (str): The key used to fetch the stored data from cache. Returns: Any: The value returned is deserialized JSON object if successful. Returns None if the key does not exist. """ with self.Session() as session, self.lock: try: namespace_key = self._prefix(key) record = session.query(CacheTable).filter(CacheTable.key == namespace_key).first() structured_data = self._deserialize_data(record.cache) if record else None if record: return structured_data except exc.SQLAlchemyError as e: msg = f"Error during attempted retrieval of key {key} (namespace = '{self.namespace}'): {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheRetrievalException if self.raise_on_error else None, msg=msg, ) return None
[docs] def retrieve_all(self) -> Dict[str, Any]: """Retrieve all records from cache. Returns: dict: Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects. """ with self.Session() as session, self.lock: cache = {} try: records = session.query(CacheTable).all() cache = { str(record.key): self._deserialize_data(record.cache) if record else None for record in records if not self.namespace or str(record.key).startswith(self.namespace) } except exc.SQLAlchemyError as e: msg = f"Error during attempted retrieval of records from namespace '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheRetrievalException if self.raise_on_error else None, msg=msg, ) return cache
[docs] def retrieve_keys(self) -> List[str]: """Retrieve all keys for records from cache . Returns: list: A list of all keys saved via SQL. """ with self.Session() as session, self.lock: try: keys = [ str(record.key) for record in session.query(CacheTable).all() if not self.namespace or str(record.key).startswith(self.namespace) ] except exc.SQLAlchemyError as e: msg = f"Error during attempted retrieval of all keys from namespace '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheRetrievalException if self.raise_on_error else None, msg=msg, ) keys = [] return keys
[docs] def update(self, key: str, data: Any) -> None: """Update the cache by storing associated value with provided key. Args: key (str): The key used to store the serialized JSON string in cache. data (Any): A Python object that will be serialized into JSON format and stored. This includes standard data types like strings, numbers, lists, dictionaries, etc. """ with self.Session() as session, self.lock: try: namespace_key = self._prefix(key) unstructured_data = self._serialize_data(data) record = session.query(CacheTable).filter(CacheTable.key == namespace_key).first() if record: record.cache = unstructured_data else: record = CacheTable(key=namespace_key, cache=unstructured_data) session.add(record) logger.debug(f"Cache updated for key: {namespace_key}") session.commit() except exc.SQLAlchemyError as e: session.rollback() msg = f"Error during attempted update of key {key} (namespace = '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheUpdateException if self.raise_on_error else None, msg=msg )
[docs] def delete(self, key: str) -> None: """Delete the value associated with the provided key from cache. Args: key (str): The key used associated with the stored data from cache. """ with self.Session() as session, self.lock: try: namespace_key = self._prefix(key) record = session.query(CacheTable).filter(CacheTable.key == namespace_key).first() if record: session.delete(record) session.commit() else: logger.info(f"Record for key {key} (namespace = '{self.namespace}') does not exist") except exc.SQLAlchemyError as e: session.rollback() msg = f"Error during attempted deletion of key {key} (namespace = '{self.namespace}'): {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheDeletionException if self.raise_on_error else None, msg=msg, )
[docs] def delete_all(self) -> None: """Delete all records from cache that match the current namespace prefix.""" with self.Session() as session, self.lock: try: if self.namespace: num_deleted = session.query(CacheTable).filter(CacheTable.key.startswith(self.namespace)).delete() session.commit() else: num_deleted = session.query(CacheTable).delete() session.commit() logger.debug(f"Deleted {num_deleted} records.") except exc.SQLAlchemyError as e: msg = f"Error during attempted deletion of all records from namespace '{self.namespace}': {e}" session.rollback() self._handle_storage_exception( exception=e, operation_exception_type=CacheDeletionException if self.raise_on_error else None, msg=msg, )
def _serialize_data(self, record_data: Any) -> Any: """Helper method for serializing and encoding cached data. The data is first encoded, identifying nested structures that need to be encoded recursively. If a value is already in a serializable format, then the record is left as is. The data is finally unstructured and returned. Returns: The serialized version of the input data """ encoded_record_data = JsonDataEncoder.encode(record_data) serialized_data = self.converter.unstructure(encoded_record_data) return serialized_data def _deserialize_data(self, record_data: Any) -> Any: """Handles the serialization and deserialization of the SQLCacheStorage. This implementation only attempts to structure the data in the case where it is a dictionary or list, as the CacheTable's cache column implements the JSON column schema. All other types are decoded and returned as is. """ if not record_data: return record_data if isinstance(record_data, list): record_type: Optional[type] = list elif isinstance(record_data, dict): record_type = dict else: record_type = None structured_record_data = self.converter.structure(record_data, record_type) if record_type else record_data deserialized_data = JsonDataEncoder.decode(structured_record_data) return deserialized_data
[docs] def verify_cache(self, key: str) -> bool: """Check if specific cache key exists. Args: key (str): The key to check its presence in the SQL storage backend. Returns: bool: True if the key is found otherwise False. Raises: ValueError: If provided key is empty or None. """ if not key: raise ValueError(f"Key invalid. Received {key} (namespace = '{self.namespace}')") try: with self.with_raise_on_error(): return self.retrieve(key) is not None except StorageCacheException as e: msg = f"Error during the verification of the existence of key {key} (namespace = '{self.namespace}'): {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheVerificationException if self.raise_on_error else None, msg=msg, ) return False
[docs] @classmethod def is_available(cls, url: Optional[str] = None, verbose: bool = True) -> bool: """Helper class method for testing whether the SQL service can be accessed. If so, this function returns True, otherwise False. Args: host (str): Indicates the location to attempt a connection port (int): Indicates the port where the service can be accessed verbose (bool): Indicates whether to log at the levels, DEBUG and lower, or to log warnings only """ if sqlalchemy is None: logger.warning("The sqlalchemy module is not available") return False db_url: str = url or cls.DEFAULT_CONFIG["url"]() try: engine = create_engine(url=db_url) with engine.connect(): pass if verbose: logger.info(f"The SQL Service is available at {db_url}") return True except (exc.SQLAlchemyError, TimeoutError, ConnectionError) as e: logger.warning(f"An active SQL service could not be found at {db_url}: {e}") return False
__all__ = ["SQLAlchemyStorage"]