Source code for scholar_flux.data_storage.mongodb_storage

# /data_storage/mongo_storage.py
"""The scholar_flux.data_storage.mongodb_storage module implements the MongoDBStorage backend for the DataCacheManager.

This class implements the abstract methods required for compatibility with the scholar_flux.DataCacheManager to
ensure that each method can be injected as a dependency.

This class implements caching by using the prebuilt features available in MongoDB to store ProcessedResponse fields
within the database for later CRUD operations.

"""
from __future__ import annotations
from typing import Dict, Any, List, Optional, TYPE_CHECKING

from scholar_flux.exceptions import (
    MongoDBImportError,
    StorageCacheException,
    CacheRetrievalException,
    CacheUpdateException,
    CacheDeletionException,
    CacheVerificationException,
)

from scholar_flux.data_storage.abc_storage import ABCStorage
from scholar_flux.utils import config_settings  # provides the loaded global environment configuration

import threading
import logging

logger = logging.getLogger(__name__)

from datetime import datetime, timedelta, timezone

if TYPE_CHECKING:
    import pymongo
    from pymongo import MongoClient
    from pymongo.errors import DuplicateKeyError, PyMongoError, ServerSelectionTimeoutError, ConnectionFailure
else:
    try:
        import pymongo
        from pymongo import MongoClient
        from pymongo.errors import DuplicateKeyError, PyMongoError, ServerSelectionTimeoutError, ConnectionFailure
    except ImportError:
        pymongo = None
        MongoClient = None
        ServerSelectionTimeoutError = Exception
        ConnectionFailure = Exception
        DuplicateKeyError = Exception
        PyMongoError = Exception


[docs] class MongoDBStorage(ABCStorage): """Implements the storage methods necessary to interact with MongoDB with a unified backend interface. The MongoDBStorage uses the same underlying interface as other scholar_flux storage classes for use with the DataCacheManager. This implementation is designed to use a key-value store as a cache by which data can be stored and retrieved in a relatively straightforward manner similar to the In-Memory Storage. Examples: >>> from scholar_flux.data_storage import MongoDBStorage # Defaults to connecting to locally (mongodb://127.0.0.1) on the default port for MongoDB (27017) # Verifies that a mongodb service is actually available locally on the default port >>> assert MongoDBStorage.is_available() >>> mongo_storage = MongoDBStorage(namespace='testing_functionality') >>> print(mongo_storage) # OUTPUT: MongoDBStorage(...) # Adding records to the storage >>> mongo_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> mongo_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) # Revising and overwriting a record >>> mongo_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> mongo_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> mongo_storage.retrieve_all() # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} >>> mongo_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> mongo_storage.delete_all() # deletes all records from the namespace >>> mongo_storage.retrieve_keys() # Will now be empty >>> mongo_storage.retrieve_all() # Will also be empty """ DEFAULT_CONFIG: Dict[str, Any] = { "host": config_settings.config.get("SCHOLAR_FLUX_MONGODB_HOST") or "mongodb://127.0.0.1", "port": config_settings.config.get("SCHOLAR_FLUX_MONGODB_PORT") or 27017, "db": "storage_manager_db", "collection": "result_page", } # for mongodb, the default DEFAULT_NAMESPACE: Optional[str] = None DEFAULT_RAISE_ON_ERROR: bool = False
[docs] def __init__( self, host: Optional[str] = None, namespace: Optional[str] = None, ttl: Optional[float | int] = None, raise_on_error: Optional[bool] = None, **mongo_config, ): """Initialize the Mongo DB storage backend and connect to the Mongo DB server. If no parameters are specified, the MongoDB storage will default to the parameters derived from the scholar_flux.utils.config_settings.config dictionary, which, in turn, resolves the host and port from environment variables or the default MongoDB host/port in the following order of priority: - SCHOLAR_FLUX_MONGODB_HOST > MONGODB_HOST > 'mongodb://127.0.0.1' (localhost) - SCHOLAR_FLUX_MONGODB_PORT > MONGODB_PORT > 27017 Args: host (Optional[str]): The host address where the Mongo Database can be found. The default is `'mongodb://127.0.0.1'`, which is the mongo server on the localhost. Each of the following are valid values for host: - Simple hostname: 'localhost' (uses port parameter) - Full URI: 'mongodb://localhost:27017' (ignores port parameter) - Complex URI: 'mongodb://user:pass@host:27017/db?options' namespace (Optional[str]): The prefix associated with each cache key. By default, this is None. ttl (Optional[float | int]): The total number of seconds that must elapse for a cache record raise_on_error (Optional[bool]): Determines whether an error should be raised when encountering unexpected issues when interacting with MongoDB. If `None`, the `raise_on_error` attribute defaults to `MongoDBStorage.DEFAULT_RAISE_ON_ERROR`. **mongo_config (Dict[Any, Any]): Configuration parameters required to connect to the Mongo DB server. Typically includes parameters such as host, port, db, etc. Raises: MongoDBImportError: If db module is not available or fails to load. """ # optional dependencies set to None if not available if pymongo is None: raise MongoDBImportError self.config = self.DEFAULT_CONFIG | mongo_config if host: self.config["host"] = host self.client: MongoClient = MongoClient(host=self.config["host"], port=self.config["port"]) self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE self.raise_on_error = raise_on_error if raise_on_error is not None else self.DEFAULT_RAISE_ON_ERROR self.db = self.client[self.config["db"]] self.collection = self.db[self.config["collection"]] self.collection.create_index( [("expireAt", 1)], expireAfterSeconds=0, # Use value in each document to determine whether or not to remove record ) self._validate_prefix(namespace, required=False) self.ttl = ttl self.lock = threading.Lock()
[docs] def clone(self) -> MongoDBStorage: """Helper method for creating a new MongoDBStorage with the same parameters. Note that the implementation of the MongoClient is not able to be deep copied. This method is provided for convenience for re-instantiation with the same configuration. """ cls = self.__class__ return cls(namespace=self.namespace, ttl=self.ttl, **self.config)
[docs] def retrieve(self, key: str) -> Optional[Any]: """Retrieve the value associated with the provided key from cache. Args: key (str): The key used to fetch the stored data from cache. Returns: Any: The value returned is deserialized JSON object if successful. Returns None if the key does not exist. Raises: PyMongoError: If there is an error retrieving the record """ try: namespace_key = self._prefix(key) with self.lock: cache_data = self.collection.find_one({"key": namespace_key}) if cache_data: return {k: v for k, v in cache_data["data"].items() if k not in ("_id", "key")} except PyMongoError as e: msg = f"Error during attempted retrieval of key {key} (namespace = '{self.namespace}'): {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheRetrievalException if self.raise_on_error else None, msg=msg ) logger.info(f"Record for key {key} (namespace = '{self.namespace}') not found...") return None
[docs] def retrieve_all(self) -> Dict[str, Any]: """Retrieve all records from cache that match the current namespace prefix. Returns: dict: Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects. Raises: PyMongoError: If there is an error during the retrieval of records under the namespace. """ cache = {} try: with self.lock: cache_data = self.collection.find({}, {"key": 1, "data": 1, "_id": 0}) if not cache_data: logger.info("Records not found...") else: cache = { data["key"]: {k: v for k, v in data.items() if k not in ("_id", "key")} for data in cache_data if data.get("key") and (not self.namespace or data.get("key", "").startswith(self.namespace)) } except PyMongoError as e: msg = f"Error during attempted retrieval of records from namespace '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheRetrievalException if self.raise_on_error else None, msg=msg ) return cache
[docs] def retrieve_keys(self) -> List[str]: """Retrieve all keys for records from cache. Returns: list[str]: A list of all keys saved via MongoDB. Raises: PyMongoError: If there is an error retrieving the record key. """ keys = [] try: with self.lock: keys = self.collection.distinct("key") if self.namespace: keys = [key for key in keys if key.startswith(f"{self.namespace}:")] except PyMongoError as e: msg = f"Error during attempted retrieval of all keys from namespace '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheRetrievalException if self.raise_on_error else None, msg=msg ) return keys
[docs] def update(self, key: str, data: Any): """Update the cache by storing associated value with provided key. Args: key (str): The key used to store the data in cache. data (Any): A Python object that will be serialized into JSON format and stored. This includes standard data types such as strings, numbers, lists, dictionaries, etc. Raises: PyMongoError: If an error occur when attempting to insert or update a record """ try: namespace_key = self._prefix(key) data_dict = {"key": namespace_key, "data": data} if self.ttl is not None: data_dict["expireAt"] = datetime.now(timezone.utc) + timedelta(seconds=self.ttl) is_cached = self.verify_cache(namespace_key) with self.lock: if not is_cached: self.collection.update_one({"key": namespace_key}, {"$set": data_dict}, upsert=True) else: self.collection.replace_one({"key": namespace_key}, data_dict, upsert=True) logger.debug(f"Cache updated for key: {key} (namespace = '{self.namespace}')") except DuplicateKeyError as e: logger.warning(f"Duplicate key error updating cache: {e}") except (PyMongoError, StorageCacheException) as e: msg = f"Error during attempted update of key {key} (namespace = '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheUpdateException if self.raise_on_error else None, msg=msg )
[docs] def delete(self, key: str): """Delete the value associated with the provided key from cache. Args: key (str): The key used associated with the stored data from the cache. Raises: PyMongoError: If there is an error deleting the record """ try: namespace_key = self._prefix(key) with self.lock: result = self.collection.delete_one({"key": namespace_key}) if result.deleted_count > 0: logger.debug(f"Key: {key} (namespace = '{self.namespace}') successfully deleted") else: logger.info(f"Record for key {key} (namespace = '{self.namespace}') does not exist") except PyMongoError as e: msg = f"Error during attempted deletion of key {key} (namespace = '{self.namespace}'): {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheDeletionException if self.raise_on_error else None, msg=msg )
[docs] def delete_all(self): """Delete all records from cache that match the current namespace prefix. Raises: PyMongoError: If there an error occurred when deleting records from the collection """ try: with self.lock: result = self.collection.delete_many({}) if result.deleted_count > 0: logger.debug("Deleted all records.") else: logger.warning("No records present to delete") except PyMongoError as e: msg = f"Error during attempted deletion of all records from namespace '{self.namespace}': {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheDeletionException if self.raise_on_error else None, msg=msg )
[docs] def verify_cache(self, key: str) -> bool: """Check if specific cache key exists. Args: key (str): The key to check its presence in the Mongo DB storage backend. Returns: bool: True if the key is found otherwise False. Raises: ValueError: If provided key is empty or None. CacheVerificationException: If an error occurs on data retrieval """ if not key: raise ValueError(f"Key invalid. Received {key} (namespace = '{self.namespace}')") try: with self.with_raise_on_error(): found_data = self.retrieve(key) return found_data is not None except (PyMongoError, StorageCacheException) as e: msg = f"Error during the verification of the existence of key {key} (namespace = '{self.namespace}'): {e}" self._handle_storage_exception( exception=e, operation_exception_type=CacheVerificationException if self.raise_on_error else None, msg=msg, ) return False
[docs] @classmethod def is_available(cls, host: Optional[str] = None, port: Optional[int] = None, verbose: bool = True) -> bool: """Helper method that indicates whether the MongoDB service is available or not. It attempts to establish a connection on the provided host and port and returns a boolean indicating if the connection was successful. Note that if the input to the `host` is a URI (e.g. mongodb://localhost:27017), any input provided to the `port` variable will be ignored when `MongoClient` initializes the connection and use the URI exclusively. Args: host (Optional[str]): The IP of the host of the MongoDB service. If None or an empty string, Defaults to localhost (the local computer) or the "host" entry from the class variable, DEFAULT_CONFIG. port (Optional[int]): The port where the service is hosted. If None or 0, defaults to port, 27017 or the "port" entry from the DEFAULT_CONFIG class variable. verbose (bool): Indicates whether to log status messages. Defaults to True Returns: bool: Indicating whether or not the service was be successfully accessed. The value returned is True if successful and False otherwise. Raises: ServerSelectionTimeoutError: If a timeout error occurs when attempting to ping Mongo DB ConnectionFailure: If a connection cannot be established """ if pymongo is None: logger.warning("The pymongo module is not available") return False mongodb_host = host or cls.DEFAULT_CONFIG["host"] mongodb_port = port or cls.DEFAULT_CONFIG["port"] try: client: MongoClient with MongoClient(host=mongodb_host, port=mongodb_port, serverSelectionTimeoutMS=1000) as client: client.server_info() if verbose: logger.info(f"The MongoDB service is available at {mongodb_host}:{mongodb_port}") return True except (ServerSelectionTimeoutError, ConnectionFailure) as e: logger.warning(f"An active MongoDB service could not be found at {mongodb_host}:{mongodb_port}: {e}") return False
__all__ = ["MongoDBStorage"]