scholar_flux.data_storage package

Submodules

scholar_flux.data_storage.abc_storage module

The scholar_flux.data_storage.abc_storage module implements the ABCStorage that defines the abstractions that are to be implemented to create a scholar_flux compatible storage. The ABCStorage defines basic CRUD operations and convenience methods used to perform operations on the entire range of cached records, or optionally, cached records specific to a namespace.

scholar_flux implements the ABCStorage with subclasses for SQLite (through SQLAlchemy), Redis, MongoDB, and In-Memory cache and can be further extended to duckdb and other abstractions supported by SQLAlchemy.

class scholar_flux.data_storage.abc_storage.ABCStorage(*args: Any, **kwargs: Any)[source]

Bases: ABC

The ABCStorage class provides the basic structure required to implement the data storage cache with customized backend.

This subclass provides methods to check the cache, delete from the cache, update the cache with new data, and retrieve data from the cache storage.

__init__(*args: Any, **kwargs: Any) None[source]

Initializes the current storage implementation.

abstract clone() Self[source]

Helper method for cloning the structure and configuration of future implementations.

abstract delete(*args: Any, **kwargs: Any) bool | None[source]

Core method for record deletion.

Should return True when successful, False otherwise, and None on error.

abstract delete_all(*args: Any, **kwargs: Any) None[source]

Core method for deleting all pages of records from the cache.

classmethod get_default_config() dict[source]

Get default configuration with current config_settings values.

abstract classmethod is_available(*args: Any, **kwargs: Any) bool[source]

Core method for verifying whether a storage/service is available.

classmethod ping(*args: Any, **kwargs: Any) None[source]

Verifies that a connection to the storage implementation can be established successfully.

This is a no-op by default for storage backends that don’t require external connections (e.g., InMemoryStorage, NullStorage). Storage backends connecting to external services (Redis, MongoDB, SQL) should override this method to perform actual connection checks.

Note

The signature and arguments vary by storage implementation: - Redis: ping(client: redis.Redis) - MongoDB: ping(client: MongoClient) - SQL: ping(engine: Engine) - InMemory/Null: ping() (no-op, uses default)

abstract retrieve(*args: Any, **kwargs: Any) Any | None[source]

Core method for retrieving a page of records from the cache.

abstract retrieve_all(*args: Any, **kwargs: Any) dict[str, Any] | None[source]

Core method for retrieving all pages of records from the cache.

abstract retrieve_keys(*args: Any, **kwargs: Any) list[str] | None[source]

Core method for retrieving all keys from the cache.

structure(flatten: bool = False, show_value_attributes: bool = True, mask_values: bool = True) str[source]

Helper method for quickly showing a representation of the overall structure of the current storage subclass. The instance uses the generate_repr helper function to produce human-readable representations of the core structure of the storage subclass with its defaults.

Parameters:
  • flatten (bool) – Flag indicating to flatten the string representation of the object into a single line when True and to preserve a multiline representation of the storage when False (default).

  • show_value_attributes (bool) – Flag for hiding the internal attributes of nested attributes when True (arguments replaced with ) and showing their default representation when False.

  • mask_values (bool) – Masks any potentially sensitive data shown in the representation when True (default) and shows the representation without sensitive data masking when False.

Returns:

The structure of the current storage subclass as a string.

Return type:

str

abstract update(*args: Any, **kwargs: Any) None[source]

Core method for updating the cache with new records.

abstract verify_cache(*args: Any, **kwargs: Any) bool[source]

Core method for verifying the cache based on the key.

abstract verify_connection() None[source]

Verifies that the storage is available for connection with initialized storage configuration settings.

with_namespace(value: str) Iterator[None][source]

Uses a context manager to temporarily modify the namespace attribute for the context duration.

with_raise_on_error(value: bool = True) Iterator[None][source]

Uses a context manager to temporarily modify the raise_on_error attribute for the context duration.

All storage backends that inherit from the ABCStorage will also inherit the with_raise_on_error context manager. When used, this context manager temporarily sets the raise_on_error attribute to True or False for the duration of a code block without permanently changing the storage subclass’s configuration.

This context manager is most useful for briefly suppressing errors and in cache verification when errors need to be logged and reported instead of silently indicating that a cache entry couldn’t be found.

Parameters:

value (bool) – A value to temporarily assign to raise_on_error for the context duration

Example

>>> with storage.with_raise_on_error(True):
>>>     # Any storage operation here will raise on error, regardless of the instance default
>>>     storage.retrieve(key)

scholar_flux.data_storage.data_cache_manager module

The scholar_flux.data_storage.data_cache_manager implements a DataCacheManager for response caching and retrieval.

This class is the user-interface that implements a unified interface for different cache storage devices that inherit from the ABCStorage class.

class scholar_flux.data_storage.data_cache_manager.DataCacheManager(cache_storage: ABCStorage | None = None, **storage_kwargs: Any)[source]

Bases: object

DataCacheManager class manages caching of API responses.

This class provides methods to generate cache keys, verify cache entries, check cache validity, update cache with new data, and retrieve data from the cache storage.

Parameters:

cache_storage (-) – Optional; A dictionary to store cached data. Defaults to using In-Memory Storage.

- generate_fallback_cache_key(response)

Generates a unique fallback cache key based on the response URL and status code.

- verify_cache(cache_key)

Checks if the provided cache_key exists in the cache storage.

- cache_is_valid(cache_key, response=None, cached_response=None)

Determines whether the cached data for a given key is still valid.

- update_cache(cache_key, response, store_raw=False, metadata=None, parsed_response=None, processed_records=None)

Updates the cache storage with new data.

- retrieve(cache_key)

Retrieves data from the cache storage based on the cache key.

- retrieve_from_response(response)

Retrieves data from the cache storage based on the response if within cache.

- verify_connection()

Verifies that a connection can be established using the current cache configuration.

Examples

>>> from scholar_flux.data_storage import DataCacheManager
>>> from scholar_flux.api import SearchCoordinator
# Factory method that creates a default redis connection to the service on localhost if available.
>>> redis_cache_manager = DataCacheManager.with_storage('redis')
# Creates a search coordinator for retrieving API responses from the PLOS API provider
>>> search_coordinator = SearchCoordinator(query = 'Computational Caching Strategies',
                                           provider_name='plos',
                                           cache_requests = True, # caches raw requests prior to processing
                                           cache_manager=redis_cache_manager) # caches response processing
# Uses the cache manager to temporarily store cached responses for the default duration
>>> processed_response = search_coordinator.search(page = 1)
# On the next search, the processed response data can be retrieved directly for later response reconstruction
>>> retrieved_response_json = search_coordinator.responses.cache.retrieve(processed_response.cache_key)
# Serialized responses store the core response fields (content, URL, status code) associated with API responses
>>> assert isinstance(retrieved_response_json, dict) and 'serialized_response' in retrieved_response_json
__init__(cache_storage: ABCStorage | None = None, **storage_kwargs: Any) None[source]

Initializes the DataCacheManager with the selected cache storage.

Parameters:
  • cache_storage (Optional[ABCStorage]) – An already-instantiated storage backend. If None, creates a default storage.

  • **storage_kwargs – Keyword arguments passed to the default storage backend constructor when cache_storage is None. Common parameters include: - verify_connection (bool): Verify storage availability on initialization - namespace (str): Prefix for cache keys - ttl (int): Time-to-live for cache entries - raise_on_error (bool): Whether to raise exceptions on cache errors

classmethod cache_fingerprint(obj: str | Any | None = None, package_version: str | None = '0.5.0') str[source]

Generates a unique string to identify an object’s structure and configuration for later cache retrieval.

By default, a fingerprint is generated from the current package version and object representation, if provided. If otherwise not provided, a new human-readable object representation is generated using the scholar_flux.utils.generate_repr helper function that represents the object name and its current state. The package version is also prepended to the current finger-print if enabled (not None), and can be customized if needed for object-specific versioning.

Parameters:
  • obj (Optional[str]) – A finger-printed object, or an object to generate a representation of

  • package_version (Optional[str]) – The current package version string or manually provided version for a component).

Returns:

A human-readable string including the version, object identity

Return type:

str

cache_is_valid(cache_key: str, response: Response | ResponseProtocol | None = None, cached_response: Dict[str, Any] | None = None) bool[source]

Determines whether the cached data for a cache key is valid or needs reprocessing due to missing fields.

To verify the freshness of a cached response, the content hash is compared against a fresh response if available. Checks for validity are also performed to determine whether the cache key recorded within the metadata matches the currently provided key and whether other core fields haven’t changed.

If a cached_response dictionary was not directly passed, the cache key will be retrieved from storage before comparison.

Parameters:
  • cache_key (str) – The unique identifier for cached data.

  • response (Optional[Response | ResponseProtocol]) – The API response or response-like object used to validate the cache, if available.

  • cached_response – Optional[Dict[str, Any]]: The cached data associated with the key

Returns:

True if the cache is valid, False otherwise.

Return type:

bool

property cache_storage: ABCStorage

The response cache storage used to store raw response data, processed records, and metadata.

clone() Self[source]

Creates a newly cloned instance of the current DataCacheManager.

property config: dict

The underlying configuration dictionary being used with the current storage device.

classmethod default_cache_storage(raise_on_error: bool = False, **storage_kwargs: Any) ABCStorage[source]

Creates a storage device from SCHOLAR_FLUX_DEFAULT_RESPONSE_CACHE_STORAGE or an In-memory cache otherwise.

This storage device, once created, defines the storage mechanism used by the DataCacheManager to cache processed response data.

Parameters:
  • raise_on_error (bool) – If True, an exception is raised when the environment variable exists but attempts to use an unknown storage device. If False, this method instead logs a warning on errors and creates a default DataCacheManager with an InMemoryStorage device instead.

  • **storage_kwargs – Keyword arguments passed to the storage backend constructor. Common parameters include: - verify_connection (bool): Verify storage availability on initialization - namespace (str): Prefix for cache keys - ttl (int): Time-to-live for cache entries

Returns:

A new, subclassed default storage backend.

Return type:

ABCStorage

delete(cache_key: str) None[source]

Deletes data from the cache storage based on the cache key.

Parameters:

cache_key – A unique identifier for the cached data.

Returns:

The cached data corresponding to the cache key if found, otherwise None.

Return type:

None

classmethod from_defaults(raise_on_error: bool = False, **storage_kwargs: Any) Self[source]

Creates a cache from SCHOLAR_FLUX_DEFAULT_RESPONSE_CACHE_STORAGE or an In-memory cache otherwise.

Parameters:
  • raise_on_error (bool) – If True, an exception is raised when unknown storage types are received. If False, a warning is logged and this method defaults to creating a DataCacheManager using an InMemoryStorage.

  • **storage_kwargs – Keyword arguments passed to the storage backend constructor. Common parameters include: - verify_connection (bool): Verify storage availability on initialization - namespace (str): Prefix for cache keys - ttl (int): Time-to-live for cache entries

Returns:

A new DataCacheManager instance with the default storage backend.

Return type:

Self

classmethod generate_fallback_cache_key(response: Response | ResponseProtocol, use_parameters: bool = True) str[source]

Generates a unique fallback cache key based on the response URL and status code.

Parameters:

response – The API response object.

Returns:

A unique fallback cache key.

Return type:

str

classmethod generate_response_hash(response: Response | ResponseProtocol) str[source]

Generates a hash of the response content.

The hashlib library is used to generate a sha256 sum that returns a consistent hash for the same input.

Parameters:

response – The API response object.

Returns:

A SHA-256 hash of the response content.

Return type:

str

isnull() bool[source]

Helper method for determining whether the current cache manager uses a null storage.

property namespace: str | None

The namespace of the current cache storage device.

classmethod null() Self[source]

Creates a DataCacheManager using a NullStorage (no storage).

This storage device has the effect of returning False when validating whether the current DataCacheManager is in operation or not

Returns:

The current class initialized without storage

Return type:

DataCacheManager

property raise_on_error: bool

Indicates whether errors will be caught or re-raised on failed connections.

retrieve(cache_key: str) Dict[str, Any] | None[source]

Retrieves data from the cache storage based on the cache key.

Parameters:

cache_key – A unique identifier for the cached data.

Returns:

The cached data corresponding to the cache key if found, otherwise None.

Return type:

Optional[Dict[str, Any]]

retrieve_from_response(response: Response | ResponseProtocol) Dict[str, Any] | None[source]

Retrieves data from the cache storage based on the response if within cache.

Parameters:

response – The API response object.

Returns:

The cached data corresponding to the response if found, otherwise None.

Return type:

Optional[Dict[str, Any]]

structure(flatten: bool = False, show_value_attributes: bool = False) str[source]

Helper method for quickly showing a representation of the overall structure of the current DataCacheManager.

The instance uses the generate_repr helper function to produce human-readable representations of the core structure of the storage subclass with its defaults.

Returns:

The structure of the current DataCacheManager as a string.

Return type:

str

property ttl: int | float | None

The time to live associated with the current storage device.

The implementation details depend on the device used to store the response processing cache:

  • RedisStorage: measured in seconds (default=None)

  • MongoDBStorage: measured in seconds (default=None)

  • InMemoryStorage: No-Op (Always returns None)

  • NullStorage: No-Op (Always returns None)

  • SQLAlchemyStorage: No-Op (Always returns None)

  • DuckDBStorage: No-Op (Always returns None)

update_cache(cache_key: str, response: Response | ResponseProtocol, store_raw: bool = False, parsed_response: Any | None = None, metadata: Dict[str, Any] | None = None, extracted_records: Any | None = None, processed_records: Any | None = None, **kwargs: Any) None[source]

Updates the cache storage with data from intermediate and final steps in response retrieval and processing.

Parameters:
  • cache_key – A unique identifier for the cached data.

  • response – (requests.Response | ResponseProtocol) The API response or response-like object.

  • store_raw – (Optional) A boolean indicating whether to store the raw response. Defaults to False.

  • metadata – (Optional) Additional metadata associated with the cached data. Defaults to None.

  • parsed_response – (Optional) The response data parsed into a structured format. Defaults to None.

  • extracted_records – (Optional) The records extracted from a parsed response prior to record processing.

  • processed_records – (Optional) The response data processed for specific use. Defaults to None.

  • kwargs – Optional additional hashable dictionary fields that can be stored using sql cattrs encodings or in-memory cache.

verify_cache(cache_key: str | None) bool[source]

Checks if the provided cache_key exists in the cache storage.

Parameters:

cache_key – A unique identifier for the cached data.

Returns:

True if the cache key exists, False otherwise.

Return type:

bool

verify_connection() None[source]

Verifies that a connection can be established to a cache based on the current cache_storage configuration.

  • InMemoryStorage (No-Op: Always successful)

  • NullStorage (No-Op: Always successful)

  • MongoDBStorage (Tries to verify connectivity via a ping request)

  • RedisStorage (Tries to verify connectivity via a ping request)

  • SQLAlchemyStorage (Verifies that a file-based or remote connection can be established [to SQLite by default])

  • DuckDBStorage (Verifies that a file-based or remote DuckDB/MotherDuck connection can be established)

Raises:

StorageCacheException – When an error occurs during an connection verification with the underlying cache

Note: When successful, nothing is returned. An error is only raised when a connection cannot be established.

classmethod with_storage(cache_storage: Literal['redis', 'sql', 'sqlalchemy', 'duckdb', 'mongodb', 'pymongo', 'inmemory', 'memory', 'null'] | None = None, *args: Any, **kwargs: Any) Self[source]

Creates a DataCacheManager using a known storage device.

This is a convenience function allowing the user to create a DataCacheManager with redis, sql, mongodb, or inmemory storage with default settings or through the use of optional positional and keyword parameters to initialize the storage as needed.

Note that sql is shorthand for the SQLAlchemy cache storage and uses SQLite. Compatible implementations of other storage devices can be used instead via SQLAlchemy as well (e.g. DuckDB).

Parameters:
  • cache_storage (Literal["redis", "sql", "sqlalchemy", "mongodb", "pymongo", "inmemory", "memory", "null"]) – A default ABCStorage subclass implementation to use as a response processing cache.

  • *args – Positional arguments to pass to the chosen ABCStorage subclass constructor.

  • **kwargs – Keyword arguments to pass to the chosen ABCStorage subclass constructor.

Returns:

The current class initialized the chosen storage

Return type:

DataCacheManager

scholar_flux.data_storage.in_memory_storage module

The scholar_flux.data_storage.in_memory_storage module implements an InMemoryStorage class that implements a basic cache storage with an in-memory dictionary.

The InMemoryStorage class implements the basic CRUD operations and convenience methods used to perform operations.

class scholar_flux.data_storage.in_memory_storage.InMemoryStorage(namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **kwargs: Any)[source]

Bases: ABCStorage

Default storage class that implements an in-memory storage cache using a dictionary.

This class implements the required abstract methods from the ABCStorage base class to ensure compatibility with the scholar_flux.DataCacheManager. Methods are provided to delete from the cache, update the cache with new data, and retrieve data from the cache.

Parameters:
  • namespace (Optional[str]) – Prefix for cache keys. Defaults to None.

  • ttl (Optional[int]) – Ignored. Included for interface compatibility; not implemented.

  • **kwargs – Ignored. Included for interface compatibility; not implemented.

Examples

>>> from scholar_flux.data_storage import InMemoryStorage
### defaults to a basic dictionary:
>>> memory_storage = InMemoryStorage(namespace='testing_functionality')
>>> print(memory_storage)
# OUTPUT: InMemoryStorage(...)
### Adding records to the storage
>>> memory_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'})
>>> memory_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'})
### Revising and overwriting a record
>>> memory_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'})
>>> memory_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace
# OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2']
>>> memory_storage.retrieve_all() # Will also be empty
# OUTPUT: {'testing_functionality:record_page_1': {'id': 52,
#           'article': 'A name to remember'},
#          'testing_functionality:record_page_2': {'id': 53,
#           'article': 'A name has many meanings'}}
>>> memory_storage.retrieve('record_page_1') # retrieves the record for page 1
# OUTPUT: {'id': 52, 'article': 'A name to remember'}
>>> memory_storage.delete_all() # deletes all records from the namespace
>>> memory_storage.retrieve_keys() # Will now be empty
>>> memory_storage.retrieve_all() # Will also be empty
DEFAULT_NAMESPACE: str | None = None
DEFAULT_RAISE_ON_ERROR: bool = False
STORAGE_TYPE: str = 'InMemory'
__init__(namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **kwargs: Any) None[source]

Initialize a basic, dictionary-like memory_cache using a namespace.

Note that ttl and **kwargs are provided for interface compatibility, and specifying any of these as arguments will not affect processing or cache initialization.

clone() InMemoryStorage[source]

Helper method for creating a new InMemoryStorage with the same configuration.

config: dict[str, Any]
delete(key: str) bool | None[source]

Attempts to delete the selected cache key if found within the current namespace.

Parameters:

key (str) – The key used associated with the stored data from the dictionary cache.

delete_all() None[source]

Attempts to delete all cache keys found within the current namespace.

classmethod is_available(*args: Any, **kwargs: Any) bool[source]

Helper method that returns True, indicating that dictionary-based storage will always be available.

Returns:

True to indicate that the dictionary-base cache storage will always be available

Return type:

(bool)

namespace: str | None
raise_on_error: bool
retrieve(key: str) Any | None[source]

Attempts to retrieve a response containing the specified cache key within the current namespace.

Parameters:

key (str) – The key used to fetch the stored data from cache.

Returns:

The value returned is deserialized JSON object if successful. Returns None if the key does not exist.

Return type:

Any

retrieve_all() dict[str, Any] | None[source]

Retrieves all cache key-response mappings found within the current namespace.

Returns:

A dictionary containing each key-value mapping for all cached data within the same namespace

Return type:

dict

retrieve_keys() list[str][source]

Retrieves the full list of all cache keys found within the current namespace.

Returns:

The full list of all keys that are currently mapped within the storage

Return type:

list[str]

structure(flatten: bool = False, show_value_attributes: bool = True, mask_values: bool = False) str[source]

Creates a concise string representation of the current InMemoryStorage device.

The representation displays the total number of records that have been registered to avoid overloading the representation with the specifics of what is being cached.

Parameters:
  • flatten (bool) – Flag indicating whether to flatten the string representation of the object into a single line when True or preserve the multiline representation of the storage cache when False (default).

  • show_value_attributes (bool) – Flag for hiding the internal attributes of nested objects when True (arguments replaced with ) and showing their default representation when False (default).

  • mask_values (bool) – Masks any potentially sensitive data shown in the representation when True. This is false by default, as the representation of the InMemoryStorage displays non-sensitive information, including only the namespace of the cache and the total cached record count.

Returns:

A basic string representation of the current object.

ttl: Any
update(key: str, data: Any) None[source]

Attempts to update the data associated with a specific cache key in the namespace.

Parameters:
  • key (str) – The key of the key-value pair

  • data (Any) – The data to be associated with the key

verify_cache(key: str) bool[source]

Verifies whether a cache key exists within the current namespace in the in-memory cache.

Parameters:

key (str) – The key to lookup in the cache

Returns:

True if the key is found otherwise False.

Return type:

bool

verify_connection() None[source]

No-Op that otherwise raises an error when connections can’t be established successfully.

scholar_flux.data_storage.mongodb_storage module

The scholar_flux.data_storage.mongodb_storage module implements the MongoDBStorage backend for the DataCacheManager.

This class implements the abstract methods required for compatibility with the scholar_flux.DataCacheManager to ensure that each method can be injected as a dependency.

This class implements caching by using the prebuilt features available in MongoDB to store ProcessedResponse fields within the database for later CRUD operations.

class scholar_flux.data_storage.mongodb_storage.MongoDBStorage(host: str | None = None, namespace: str | None = None, ttl: int | float | None = None, raise_on_error: bool | None = None, verify_connection: bool = False, **mongo_config: Any)[source]

Bases: ABCStorage

Implements the storage methods necessary to interact with MongoDB with a unified backend interface.

The MongoDBStorage uses the same underlying interface as other scholar_flux storage classes for use with the DataCacheManager. This implementation is designed to use a key-value store as a cache by which data can be stored and retrieved in a relatively straightforward manner similar to the In-Memory Storage.

Examples

>>> from scholar_flux.data_storage import MongoDBStorage
# Defaults to connecting to locally (mongodb://127.0.0.1) on the default port for MongoDB (27017)
# Verifies that a mongodb service is actually available locally on the default port
>>> assert MongoDBStorage.is_available()
>>> mongo_storage = MongoDBStorage(namespace='testing_functionality')
>>> print(mongo_storage)
# OUTPUT: MongoDBStorage(...)
# Adding records to the storage
>>> mongo_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'})
>>> mongo_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'})
# Revising and overwriting a record
>>> mongo_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'})
>>> mongo_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace
# OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2']
>>> mongo_storage.retrieve_all()
# OUTPUT: {'testing_functionality:record_page_1': {'id': 52,
#           'article': 'A name to remember'},
#          'testing_functionality:record_page_2': {'id': 53,
#           'article': 'A name has many meanings'}}
>>> mongo_storage.retrieve('record_page_1') # retrieves the record for page 1
# OUTPUT: {'id': 52, 'article': 'A name to remember'}
>>> mongo_storage.delete_all() # deletes all records from the namespace
>>> mongo_storage.retrieve_keys() # Will now be empty
>>> mongo_storage.retrieve_all() # Will also be empty
DEFAULT_CONFIG: dict[str, Any] = {'collection': 'result_page', 'db': 'storage_manager_db', 'host': 'mongodb://127.0.0.1', 'port': 27017, 'serverSelectionTimeoutMS': 5000, 'ttl': None}
DEFAULT_NAMESPACE: str | None = None
DEFAULT_RAISE_ON_ERROR: bool = False
STORAGE_TYPE: str = 'MongoDB'
__init__(host: str | None = None, namespace: str | None = None, ttl: int | float | None = None, raise_on_error: bool | None = None, verify_connection: bool = False, **mongo_config: Any) None[source]

Initialize the Mongo DB storage backend and connect to the Mongo DB server.

If no parameters are specified, the MongoDB storage will default to the parameters derived from the scholar_flux.utils.config_settings.config dictionary, which, in turn, resolves the host and port from environment variables or the default MongoDB host/port in the following order of priority:

  • SCHOLAR_FLUX_MONGODB_HOST > MONGODB_HOST > ‘mongodb://127.0.0.1’ (localhost)

  • SCHOLAR_FLUX_MONGODB_PORT > MONGODB_PORT > 27017

Parameters:
  • host (Optional[str]) –

    The host address where the Mongo Database can be found. The default is ‘mongodb://127.0.0.1’, which is the mongo server on the localhost.

    Each of the following are valid values for host:

    • Simple hostname: ‘localhost’ (uses port parameter)

    • Full URI: ‘mongodb://localhost:27017’ (ignores port parameter)

    • Complex URI: ‘mongodb://user:pass@host:27017/db?options’

  • namespace (Optional[str]) – The prefix associated with each cache key. By default, this is None.

  • ttl (Optional[float | int]) –

    The total number of seconds that must elapse for a cached record to expire. The value -1 turns off TTL expiration when directly passed or resolved from config defaults. TTL is determined in the following order of priority:

    • SCHOLAR_FLUX_DEFAULT_RESPONSE_CACHE_TTL (resolved from config_settings.get())

    • MongoDBStorage.DEFAULT_CONFIG.get(‘ttl’) (if available)

    • And None if neither of the above is set or defined.

  • raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with MongoDB. If None, the raise_on_error attribute defaults to MongoDBStorage.DEFAULT_RAISE_ON_ERROR.

  • verify_connection (bool) – If True, verifies the MongoDB service is available immediately after initialization. Raises StorageCacheException if connection fails. Defaults to False.

  • **mongo_config – Configuration parameters required to connect to the Mongo DB server. Typically includes parameters such as host, port, db, etc.

Raises:

MongoDBImportError – If db module is not available or fails to load.

client: None
clone() MongoDBStorage[source]

Helper method for creating a new MongoDBStorage with the same parameters.

Note that the implementation of the MongoClient is not able to be deep copied. This method is provided for convenience for re-instantiation with the same configuration.

config: dict[str, Any]
delete(key: str) bool | None[source]

Delete the value associated with the provided key from cache.

Parameters:

key (str) – The key associated with the stored data from the cache.

Raises:

PyMongoError – If there is an error deleting the record

delete_all() None[source]

Delete all records from cache that match the current namespace prefix.

Raises:

PyMongoError – If an error occurs when deleting records from the collection

classmethod get_default_config() dict[str, Any][source]

Get default configuration with current config_settings values.

Reads from environment variables in order of priority: - SCHOLAR_FLUX_MONGODB_HOST > cls.DEFAULT_CONFIG[‘host’] > MONGODB_HOST > “mongodb://127.0.0.1” (localhost) - SCHOLAR_FLUX_MONGODB_PORT > DEFAULT_CONFIG[‘port’] > MONGODB_PORT > 27017

Returns:

Configuration dictionary with host and port.

Return type:

dict

classmethod is_available(host: str | None = None, port: int | None = None, verbose: bool = True, **kwargs: Any) bool[source]

Helper method that indicates whether the MongoDB service is available or not.

It attempts to establish a connection on the provided host and port and returns a boolean indicating if the connection was successful.

Note that if the input to the host is a URI (e.g. mongodb://localhost:27017), any input provided to the port variable will be ignored when MongoClient initializes the connection and use the URI exclusively.

Parameters:
  • host (Optional[str]) – The IP of the host of the MongoDB service. If None or an empty string, Defaults to localhost (the local computer) or the “host” entry from the class variable, DEFAULT_CONFIG.

  • port (Optional[int]) – The port where the service is hosted. If None or 0, defaults to port, 27017 or the “port” entry from the DEFAULT_CONFIG class variable.

  • verbose (bool) – Indicates whether to log status messages. Defaults to True

  • **kwargs – No-Op keyword arguments for compatibility with config connection availability checks

Returns:

Indicating whether or not the service can be successfully accessed. The value returned is True if successful and False otherwise.

Return type:

bool

Raises:
  • ServerSelectionTimeoutError – If a timeout error occurs when attempting to ping Mongo DB

  • ConnectionFailure – If a connection cannot be established

namespace: str | None
classmethod ping(client: None) None[source]

Attempts to ping the remote service.

raise_on_error: bool
retrieve(key: str) Any | None[source]

Retrieve the value associated with the provided key from cache.

Parameters:

key (str) – The key used to fetch the stored data from cache.

Returns:

The value returned is deserialized JSON object if successful. Returns None if the key does not exist.

Return type:

Any

Raises:

PyMongoError – If there is an error retrieving the record

retrieve_all() dict[str, Any][source]

Retrieve all records from cache that match the current namespace prefix.

Returns:

Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.

Return type:

dict[str, Any]

Raises:

PyMongoError – If there is an error during the retrieval of records under the namespace.

retrieve_keys() list[str][source]

Retrieve all keys for records from cache.

Returns:

A list of all keys saved via MongoDB.

Return type:

list[str]

Raises:

PyMongoError – If there is an error retrieving the record key.

ttl: Any
update(key: str, data: Any) None[source]

Update the cache by storing associated value with provided key.

Parameters:
  • key (str) – The key used to store the data in cache.

  • data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types such as strings, numbers, lists, dictionaries, etc.

Raises:

PyMongoError – If an error occurs when attempting to insert or update a record

verify_cache(key: str) bool[source]

Check if specific cache key exists.

Parameters:

key (str) – The key to check its presence in the Mongo DB storage backend.

Returns:

True if the key is found otherwise False.

Return type:

bool

Raises:
verify_connection() None[source]

Verifies that the MongoDBStorage is available for connection with initialized storage configuration settings.

scholar_flux.data_storage.null_storage module

The scholar_flux.data_storage.null_storage module implements a Null (No-Op) Storage that is used to ensure that responses are always reprocessed when implemented.

class scholar_flux.data_storage.null_storage.NullStorage(namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = None, **kwargs: Any)[source]

Bases: ABCStorage

NullStorage is a no-op implementation of ABCStorage. This class is useful for when you want to disable storage without changing code logic.

The scholar_flux package mainly implements this storage when the user turns off processing cache.

Example

>>> from scholar_flux.data_storage import DataCacheManager, NullStorage
>>> from scholar_flux.api import SearchCoordinator
>>> null_storage = DataCacheManager.null()
## This implements a data cache with the null storage under the hood:
>>> assert isinstance(null_storage.cache_storage, NullStorage)
>>> search_coordinator = SearchCoordinator(query='History of Data Caching', cache_manager=null_storage)
# Otherwise the same can be performed with the following:
>>> search_coordinator = SearchCoordinator(query='History of Data Caching', cache_results = False)
# The processing of responses will then be recomputed on the next search:
>>> response = search_coordinator.search(page = 1)
DEFAULT_NAMESPACE: str | None = None
DEFAULT_RAISE_ON_ERROR: bool = False
STORAGE_TYPE: str = 'Null'
__init__(namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = None, **kwargs: Any) None[source]

Initialize a No-Op cache for compatibility with the ABCStorage base class.

Note that namespace, ttl, raise_on_error, and **kwargs are provided for interface compatibility, and specifying any of these as arguments will not affect initialization.

clone() NullStorage[source]

Helper method for creating a new implementation of the current NullStorage.

config: dict[str, Any]
delete(*args: Any, **kwargs: Any) None[source]

Method added for abstract class consistency - no-op

delete_all(*args: Any, **kwargs: Any) None[source]

Method added for abstract class consistency - no-op

classmethod is_available(*args: Any, **kwargs: Any) bool[source]

Method added for abstract class consistency - returns True, indicating that the no-op storage is always available although no cache is ever stored.

namespace: str | None
raise_on_error: bool
retrieve(*args: Any, **kwargs: Any) Any | None[source]

Method added for abstract class consistency - no-op

retrieve_all(*args: Any, **kwargs: Any) dict[str, Any] | None[source]

Method added for abstract class consistency - returns a dictionary for type consistency

retrieve_keys(*args: Any, **kwargs: Any) list[str] | None[source]

Method added for abstract class consistency - returns a list for type consistency

ttl: Any
update(*args: Any, **kwargs: Any) None[source]

Method added for abstract class consistency - no-op

verify_cache(*args: Any, **kwargs: Any) bool[source]

Method added for abstract class consistency - returns False, indicating that no cache is ever stored

verify_connection() None[source]

No-Op that otherwise raises an error when connections can’t be established successfully.

scholar_flux.data_storage.redis_storage module

The scholar_flux.data_storage.redis_storage module implements the RedisStorage backend for the DataCacheManager.

This class implements the abstract methods required for compatibility with the scholar_flux.DataCacheManager.

This class implements caching by using the serialization-deserialization and caching features available in Redis to store ProcessedResponse fields within the database for later CRUD operations.

WARNING: Ensure that the ‘namespace’ parameter is set to a non-empty, unique value for each logical cache. Using an empty or shared namespace may result in accidental deletion or overwriting of unrelated data. For that reason, the delete_all method does not perform any deletions unless a namespace exists

class scholar_flux.data_storage.redis_storage.RedisStorage(host: str | None = None, namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, verify_connection: bool = False, **redis_config: Any)[source]

Bases: ABCStorage

Implements the storage methods necessary to interact with Redis using a unified backend interface.

The RedisStorage implements the abstract methods from the ABCStorage class for use with the DataCacheManager. This implementation is designed to use a key-value store as a cache by which data can be stored and retrieved in a relatively straightforward manner similar to the In-Memory Storage.

Examples

>>> from scholar_flux.data_storage import RedisStorage
# Defaults to connecting to locally (localhost) on the default port for Redis services (6379)
# Verifies that a Redis service is locally available.
>>> assert RedisStorage.is_available()
>>> redis_storage = RedisStorage(namespace='testing_functionality')
>>> print(redis_storage)
# OUTPUT: RedisStorage(...)
# Adding records to the storage
>>> redis_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'})
>>> redis_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'})
# Revising and overwriting a record
>>> redis_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'})
>>> redis_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace
# OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2']
>>> redis_storage.retrieve_all() # Will also be empty
# OUTPUT: {'testing_functionality:record_page_1': {'id': 52,
#           'article': 'A name to remember'},
#          'testing_functionality:record_page_2': {'id': 53,
#           'article': 'A name has many meanings'}}
>>> redis_storage.retrieve('record_page_1') # retrieves the record for page 1
# OUTPUT: {'id': 52, 'article': 'A name to remember'}
>>> redis_storage.delete_all() # deletes all records from the namespace
>>> redis_storage.retrieve_keys() # Will now be empty
>>> redis_storage.retrieve_all() # Will also be empty
DEFAULT_CONFIG: dict = {'host': 'localhost', 'port': 6379, 'ttl': None}
DEFAULT_NAMESPACE: str = 'SFAPI'
DEFAULT_RAISE_ON_ERROR: bool = False
STORAGE_TYPE: str = 'Redis'
__init__(host: str | None = None, namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, verify_connection: bool = False, **redis_config: Any)[source]

Initialize the Redis storage backend and connect to the Redis server.

If no parameters are specified, the Redis storage will attempt to resolve the host and port using variables from the environment (loaded into scholar_flux.utils.config_settings at runtime).

The resolved host and port are resolved from environment variables/defaults in the following order of priority:

  • SCHOLAR_FLUX_REDIS_HOST > REDIS_HOST > ‘localhost’

  • SCHOLAR_FLUX_REDIS_PORT > REDIS_PORT > 6379

Parameters:
  • host (Optional[str]) – Redis server host. Can be provided positionally or as a keyword argument. Defaults to ‘localhost’ if not specified.

  • namespace (Optional[str]) – The prefix associated with each cache key. Defaults to DEFAULT_NAMESPACE if left None.

  • ttl (Optional[int]) –

    The total number of seconds that must elapse for a cached record to expire. While integers are the recommended input types, floats and strings that can reasonably be converted into integers will be. Also note: The value -1 turns off TTL expiration when directly passed or resolved from config defaults. TTL is determined in the following order of priority:

    • SCHOLAR_FLUX_DEFAULT_RESPONSE_CACHE_TTL (resolved from config_settings.get())

    • RedisStorage.DEFAULT_CONFIG.get(‘ttl’) (if available)

    • And None if neither of the above is set or defined.

  • raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with Redis. If None, the raise_on_error attribute defaults to RedisStorage.DEFAULT_RAISE_ON_ERROR.

  • verify_connection (bool) – If True, verifies the Redis service is available immediately after initialization. Raises StorageCacheException if connection fails. Defaults to False.

  • **redis_config – Configuration parameters required to connect to the Redis server. Typically includes parameters such as host, port, db, etc.

Raises:

RedisImportError – If redis module is not available or fails to load.

clone() RedisStorage[source]

Helper method for creating a new RedisStorage with the same parameters.

Note that the implementation of the RedisStorage is not able to be deep copied, and this method is provided for convenience in re-instantiation with the same configuration.

config: dict[str, Any]
delete(key: str) bool | None[source]

Delete the value associated with the provided key from cache.

This method indicates whether deletion was successful by returning True if the record was deleted and False if the record did not exist to be deleted.

Parameters:

key (str) – The key used associated with the stored data from cache.

Raises:

RedisError – If there is an error deleting the record

delete_all() None[source]

Delete all records from cache that match the current namespace prefix.

Raises:

RedisError – If an error occurs when deleting records from the collection

classmethod get_default_config() dict[str, Any][source]

Get default configuration with current config_settings values.

Reads from environment variables in order of priority: - SCHOLAR_FLUX_REDIS_HOST > cls.DEFAULT_CONFIG[‘host’] > REDIS_HOST > ‘localhost’ - SCHOLAR_FLUX_REDIS_PORT > DEFAULT_CONFIG[‘port’] > REDIS_PORT > 6379

Returns:

Configuration dictionary with host and port.

Return type:

dict[str, Any]

classmethod is_available(host: str | None = None, port: int | None = None, verbose: bool = True, **kwargs: Any) bool[source]

Helper class method for testing whether the Redis service is available and can be accessed.

If Redis can be successfully reached, this function returns True, otherwise False.

Parameters:
  • host (Optional[str]) – Indicates the location to attempt a connection. If None or an empty string, Defaults to localhost (the local computer) or the “host” entry from the class variable, DEFAULT_CONFIG.

  • port (Optional[int]) – Indicates the port where the service can be accessed If None or 0, Defaults to port 6379 or the “port” entry from the DEFAULT_CONFIG class variable.

  • verbose (bool) – Indicates whether to log at the levels, DEBUG and lower, or to log warnings only

  • **kwargs – No-Op keyword arguments for compatibility with config connection availability checks

Raises:
  • TimeoutError – If a timeout error occurs when attempting to ping Redis

  • ConnectionError – If a connection cannot be established

namespace: str | None
classmethod ping(client: redis.Redis) None[source]

Attempts to ping the remote service.

raise_on_error: bool
retrieve(key: str) Any | None[source]

Retrieve the value associated with the provided key from cache.

Parameters:

key (str) – The key used to fetch the stored data from cache.

Returns:

The value returned is deserialized JSON object if successful. Returns None if the key does not exist.

Return type:

Any

retrieve_all() dict[str, Any][source]

Retrieve all records from cache that match the current namespace prefix.

Returns:

Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.

Return type:

dict[str, Any]

Raises:

RedisError – If there is an error during the retrieval of records under the namespace

retrieve_keys() list[str][source]

Retrieve all keys for records from cache that match the current namespace prefix.

Returns:

A list of all keys saved under the current namespace.

Return type:

list[str]

Raises:

RedisError – If there is an error retrieving the record key

ttl: Any
update(key: str, data: Any) None[source]

Update the cache by storing associated value with provided key.

Parameters:
  • key (str) – The key used to store the serialized JSON string in cache.

  • data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types like strings, numbers, lists, dictionaries, etc.

Raises:

RedisError – If an error occurs when attempting to insert or update a record

verify_cache(key: str) bool[source]

Check if specific cache key exists.

Parameters:

key (str) – The key to check its presence in the Redis storage backend.

Returns:

True if the key is found otherwise False.

Return type:

bool

Raises:
  • ValueError – If provided key is empty or None.

  • RedisError – If an error occurs when looking up a key

verify_connection() None[source]

Verifies that the RedisStorage is available for connection with the initialized configuration settings.

scholar_flux.data_storage.sql_storage module

The scholar_flux.data_storage.sql_storage module implements SQLAlchemy-based storage devices for response caching.

This module implements the SQLAlchemyStorage class and DuckDBStorage subclass, both of which implement the abstract methods required for compatibility with the scholar_flux.DataCacheManager. This module provides SQL database storage using the SQLAlchemy Object-Relational Mapper (ORM), using SQLite as the default backend.

When ProcessedResponse fields are cached, this implementation uses the JsonDataEncoder to recursively encode and serialize each field within a storage compatible JSON data structure. When retrieving data, it is decoded and deserialized to return the original object.

Classes:
  • CacheTable:

    Defines the internal specification of the SQLAlchemy table used for caching. Inherits from Base/DeclarativeBase to define its structure as a SQLAlchemy ORM model.

  • SQLAlchemyStorage:

    Primary storage class that uses SQLAlchemy to perform CRUD operations. Supports SQLite, PostgreSQL, MySQL, and other SQLAlchemy-compatible databases.

  • DuckDBStorage:

    Extends SQLAlchemyStorage with DuckDB-specific configuration and validation. Requires the duckdb_engine package for SQLAlchemy dialect support.

class scholar_flux.data_storage.sql_storage.DuckDBStorage(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, verify_connection: bool = False, **sqlalchemy_config: Any)[source]

Bases: SQLAlchemyStorage

This class extends the SQLAlchemyStorage device to support DuckDB as a supported storage device.

Note that this class requires the duckdb_engine and sqlalchemy packages and will raise an error without both being installed. This class can be initialized in the same manner as SQLAlchemy, only requiring that the passed url has a valid duckdb:/// URI scheme.

DEFAULT_CONFIG: Dict[str, Any] = {'echo': False, 'url': <function DuckDBStorage.<lambda>>}
STORAGE_TYPE: str = 'DuckDB'
__init__(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, verify_connection: bool = False, **sqlalchemy_config: Any) None[source]

Initialize the DuckDBStorage storage backend and connect to the server indicated via the url parameter.

This class extends the original SQLAlchemyStorage to provide basic helpers that aid in the creation of both simple and complex sessions using the DuckDB engine.

Parameters:
  • url (Optional[str]) – Database connection string. All URLs must begin with duckdb:///. A CacheParameterValidationException will be raised if the URL is invalid or does not contain the required scheme.

  • namespace (Optional[str]) – The prefix associated with each cache key. By default, this is None.

  • ttl (None) – Ignored. Included for interface compatibility; not implemented.

  • raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with SQLAlchemy. If None, the raise_on_error attribute defaults to SQLAlchemyStorage.DEFAULT_RAISE_ON_ERROR.

  • verify_connection (bool) – If True, verifies the SQL service is available immediately after initialization. Raises StorageCacheException if connection fails. Defaults to False.

  • **sqlalchemy_config

    Additional SQLAlchemy engine/session options passed to sqlalchemy.create_engine Typical parameters include the following:

    • url (str): Indicates what server to connect to. Defaults to sqlite in the package directory.

    • echo (bool): Indicates whether to show the executed SQL queries in the console.

classmethod create_default_url() str[source]

Creates a valid DuckDB URL within the default writable package cache directory.

classmethod is_available(url: str | None = None, verbose: bool = True, **kwargs: Any) bool[source]

Tests whether the SQL service can be accessed. If so, this function returns True, otherwise False.

Parameters:
  • url (str) – Indicates the location to attempt a connection

  • verbose (bool) – Indicates whether to log at the levels, DEBUG and lower, or to log warnings only

  • **kwargs – No-Op keyword arguments for compatibility with config connection availability checks

classmethod verify_url_string(url: str) None[source]

Helper method for verifying that the current URI is a valid DuckDB resource identifier.

class scholar_flux.data_storage.sql_storage.SQLAlchemyStorage(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, verify_connection: bool = False, **sqlalchemy_config: Any)[source]

Bases: ABCStorage

Implements the storage methods necessary to interact with SQLite3 along with other SQL flavors via sqlalchemy.

This implementation is designed to use a relational database as a cache by which data can be stored and retrieved in a relatively straightforward manner that associates records in key-value pairs similar to the In-Memory Storage.

Note:

This table uses the structure previously defined in the CacheTable to store records in a structured manner:

ID:

Automatically generated - identifies the unique record in the table

Key:

Is used to associate a specific cached record with a short human-readable (or hashed) string

Cache:

The JSON data associated with the record. To store the data, any nested, non-serializable data is first encoded before being unstructured and stored. On retrieving the data, the JSON string is decoded and restructured in order to return the original object.

The SQLAlchemyStorage can be initialized as follows:

### Import the package and initialize the storage in a dedicated package directory : >>> from scholar_flux.data_storage import SQLAlchemyStorage # Defaults to connecting to creating a local, file-based sqlite cache within the default writable directory. # Verifies that the dependency for a basic sqlite service is actually available for use locally >>> assert SQLAlchemyStorage.is_available() >>> sql_storage = SQLAlchemyStorage(namespace=’testing_functionality’) >>> print(sql_storage) # OUTPUT: SQLAlchemyStorage(…) # Adding records to the storage >>> sql_storage.update(‘record_page_1’, {‘id’:52, ‘article’: ‘A name to remember’}) >>> sql_storage.update(‘record_page_2’, {‘id’:55, ‘article’: ‘A name can have many meanings’}) # Revising and overwriting a record >>> sql_storage.update(‘record_page_2’, {‘id’:53, ‘article’: ‘A name has many meanings’}) >>> sql_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace >>> sql_storage.retrieve_all() # OUTPUT: {‘testing_functionality:record_page_1’: {‘id’: 52, # ‘article’: ‘A name to remember’}, # ‘testing_functionality:record_page_2’: {‘id’: 53, # ‘article’: ‘A name has many meanings’}} # OUTPUT: [‘testing_functionality:record_page_1’, ‘testing_functionality:record_page_2’] >>> sql_storage.retrieve(‘record_page_1’) # retrieves the record for page 1 # OUTPUT: {‘id’: 52, ‘article’: ‘A name to remember’} >>> sql_storage.delete_all() # deletes all records from the namespace >>> sql_storage.retrieve_keys() # Will now be empty

DEFAULT_CONFIG: Dict[str, Any] = {'echo': False, 'url': <function SQLAlchemyStorage.<lambda>>}
DEFAULT_NAMESPACE: str | None = None
DEFAULT_RAISE_ON_ERROR: bool = False
STORAGE_TYPE: str = 'SQL'
__init__(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, verify_connection: bool = False, **sqlalchemy_config: Any) None[source]

Initialize the SQLAlchemy storage backend and connect to the server indicated via the url parameter.

This class uses the innate flexibility of SQLAlchemy to support backends such as SQLite, Postgres, DuckDB, etc.

Parameters:
  • url (Optional[str]) – Database connection string. This can be provided positionally or as a keyword argument.

  • namespace (Optional[str]) – The prefix associated with each cache key. By default, this is None.

  • ttl (None) – Ignored. Included for interface compatibility, but not implemented.

  • raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with SQLAlchemy. If None, the raise_on_error attribute defaults to SQLAlchemyStorage.DEFAULT_RAISE_ON_ERROR.

  • verify_connection (bool) – If True, verifies the SQL service is available immediately after initialization. Raises StorageCacheException if connection fails. Defaults to False.

  • **sqlalchemy_config

    Additional SQLAlchemy engine/session options passed to sqlalchemy.create_engine Typical parameters include the following:

    • url (str): Indicates what server to connect to. Defaults to sqlite in the package directory.

    • echo (bool): Indicates whether to show the executed SQL queries in the console.

clone() SQLAlchemyStorage[source]

Helper method for creating a new SQLAlchemyStorage with the same parameters.

Note that the implementation of the SQLAlchemyStorage is not able to be deep copied, and this method is provided for convenience in re-instantiation with the same configuration.

config: dict[str, Any]
classmethod create_default_url() str[source]

Creates a default URL within the writable directory for the current SQLAlchemyStorage class or subclass.

delete(key: str) bool | None[source]

Delete the value associated with the provided key from cache.

Parameters:

key (str) – The key used associated with the stored data from cache.

delete_all() None[source]

Delete all records from cache that match the current namespace prefix.

classmethod get_default_config() dict[str, Any][source]

Get default configuration with current config_settings values.

Returns:

A dictionary configuration with the default URL and echo (for debugging SQL statements).

Return type:

dict

classmethod get_default_url() str[source]

Retrieves the SQLAlchemy URL from the environment configuration, falling back to the default when invalid.

Returns:

The validated URL from the environment configuration if valid. Otherwise the default URL generated via cls.create_default_url().

Return type:

str

Note: This method first attempts to validate the URL string from the environment variable, SCHOLAR_FLUX_SQLALCHEMY_URL, using the cls.verify_url_string class method. When validation fails, the default for the current class is returned via cls.create_default_url instead.

classmethod is_available(url: str | None = None, verbose: bool = True, **kwargs: Any) bool[source]

Tests whether the SQL service can be accessed. If so, this function returns True, otherwise False.

Parameters:
  • url (str) – Indicates the location to attempt a connection

  • verbose (bool) – Indicates whether to log at the levels, DEBUG and lower, or to log warnings only

  • **kwargs – No-Op keyword arguments for compatibility with config connection availability checks

namespace: str | None
classmethod ping(engine: None) None[source]

Verifies that the client can successfully connect to the database.

raise_on_error: bool
retrieve(key: str) Any | None[source]

Retrieve the value associated with the provided key from cache.

Parameters:

key (str) – The key used to fetch the stored data from cache.

Returns:

The value returned is deserialized JSON object if successful. Returns None if the key does not exist.

Return type:

Any

retrieve_all() Dict[str, Any][source]

Retrieve all records from cache.

Returns:

Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.

Return type:

dict

retrieve_keys() List[str][source]

Retrieve all keys for records from cache.

Returns:

A list of all keys saved via SQL.

Return type:

list

ttl: Any
update(key: str, data: Any) None[source]

Update the cache by storing associated value with provided key.

Parameters:
  • key (str) – The key used to store the serialized JSON string in cache.

  • data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types like strings, numbers, lists, dictionaries, etc.

verify_cache(key: str) bool[source]

Check if specific cache key exists.

Parameters:

key (str) – The key to check its presence in the SQL storage backend.

Returns:

True if the key is found otherwise False.

Return type:

bool

Raises:

ValueError – If provided key is empty or None.

verify_connection() None[source]

Verifies that the SQLAlchemyStorage is available for connection with initialized configuration settings.

classmethod verify_url_string(url: str) None[source]

Helper method for verifying that the current URI has a valid SQLAlchemy resource identifier.

Module contents

The scholar_flux.data_storage module defines several storage implementations for caching processed responses.

These core storage definitions are used to cache the response content, records and metadata for each unique page/batch of records under a key used for cache identification.

Core components:
  • DataCacheManager: Contains the higher level methods used to create and interact with the processing cache storage

    methods in a predictable manner.

  • SQLAlchemyStorage: Contains the core methods needed to interact with a range of SQL Databases (and duckdb) using

    the same underlying interface. By default, this class uses sqlalchemy to set up a db in a consistent location.

  • RedisStorage: Contains the core methods to the Redis Client. This storage defaults to localhost, port 6379

  • MongoStorage: Contains the core methods used to interact with the Mongo DB database. By default, this class

    attempts to connect to Mongo DB on localhost, port 27017.

  • InMemoryStorage: The default storage method - simply saves processed request content and responses to a

    temporary dictionary that is deleted when the python session is stopped

  • NullStorage: A No-Op storage method that is used to effectively turn off the use of storage.

    This module is included for compatibility with the static typing used throughout the package

In addition, Exceptions for missing dependencies are set to return storage-specific errors if a storage is initialized without the necessary dependencies:

SQLAlchemyStorage -> sqlalchemy MongoStorage -> pymongo RedisStorage -> redis SQLAlchemyStorage -> sqlalchemy

Example use:
>>> from scholar_flux import DataCacheManager, SearchCoordinator
>>> processing_cache = DataCacheManager.with_storage('redis')
>>> SearchCoordinator(query = 'Programming', cache_manager = processing_cache)
class scholar_flux.data_storage.ABCStorage(*args: Any, **kwargs: Any)[source]

Bases: ABC

The ABCStorage class provides the basic structure required to implement the data storage cache with customized backend.

This subclass provides methods to check the cache, delete from the cache, update the cache with new data, and retrieve data from the cache storage.

__init__(*args: Any, **kwargs: Any) None[source]

Initializes the current storage implementation.

abstract clone() Self[source]

Helper method for cloning the structure and configuration of future implementations.

abstract delete(*args: Any, **kwargs: Any) bool | None[source]

Core method for record deletion.

Should return True when successful, False otherwise, and None on error.

abstract delete_all(*args: Any, **kwargs: Any) None[source]

Core method for deleting all pages of records from the cache.

classmethod get_default_config() dict[source]

Get default configuration with current config_settings values.

abstract classmethod is_available(*args: Any, **kwargs: Any) bool[source]

Core method for verifying whether a storage/service is available.

classmethod ping(*args: Any, **kwargs: Any) None[source]

Verifies that a connection to the storage implementation can be established successfully.

This is a no-op by default for storage backends that don’t require external connections (e.g., InMemoryStorage, NullStorage). Storage backends connecting to external services (Redis, MongoDB, SQL) should override this method to perform actual connection checks.

Note

The signature and arguments vary by storage implementation: - Redis: ping(client: redis.Redis) - MongoDB: ping(client: MongoClient) - SQL: ping(engine: Engine) - InMemory/Null: ping() (no-op, uses default)

abstract retrieve(*args: Any, **kwargs: Any) Any | None[source]

Core method for retrieving a page of records from the cache.

abstract retrieve_all(*args: Any, **kwargs: Any) dict[str, Any] | None[source]

Core method for retrieving all pages of records from the cache.

abstract retrieve_keys(*args: Any, **kwargs: Any) list[str] | None[source]

Core method for retrieving all keys from the cache.

structure(flatten: bool = False, show_value_attributes: bool = True, mask_values: bool = True) str[source]

Helper method for quickly showing a representation of the overall structure of the current storage subclass. The instance uses the generate_repr helper function to produce human-readable representations of the core structure of the storage subclass with its defaults.

Parameters:
  • flatten (bool) – Flag indicating to flatten the string representation of the object into a single line when True and to preserve a multiline representation of the storage when False (default).

  • show_value_attributes (bool) – Flag for hiding the internal attributes of nested attributes when True (arguments replaced with ) and showing their default representation when False.

  • mask_values (bool) – Masks any potentially sensitive data shown in the representation when True (default) and shows the representation without sensitive data masking when False.

Returns:

The structure of the current storage subclass as a string.

Return type:

str

abstract update(*args: Any, **kwargs: Any) None[source]

Core method for updating the cache with new records.

abstract verify_cache(*args: Any, **kwargs: Any) bool[source]

Core method for verifying the cache based on the key.

abstract verify_connection() None[source]

Verifies that the storage is available for connection with initialized storage configuration settings.

with_namespace(value: str) Iterator[None][source]

Uses a context manager to temporarily modify the namespace attribute for the context duration.

with_raise_on_error(value: bool = True) Iterator[None][source]

Uses a context manager to temporarily modify the raise_on_error attribute for the context duration.

All storage backends that inherit from the ABCStorage will also inherit the with_raise_on_error context manager. When used, this context manager temporarily sets the raise_on_error attribute to True or False for the duration of a code block without permanently changing the storage subclass’s configuration.

This context manager is most useful for briefly suppressing errors and in cache verification when errors need to be logged and reported instead of silently indicating that a cache entry couldn’t be found.

Parameters:

value (bool) – A value to temporarily assign to raise_on_error for the context duration

Example

>>> with storage.with_raise_on_error(True):
>>>     # Any storage operation here will raise on error, regardless of the instance default
>>>     storage.retrieve(key)
class scholar_flux.data_storage.DataCacheManager(cache_storage: ABCStorage | None = None, **storage_kwargs: Any)[source]

Bases: object

DataCacheManager class manages caching of API responses.

This class provides methods to generate cache keys, verify cache entries, check cache validity, update cache with new data, and retrieve data from the cache storage.

Parameters:

cache_storage (-) – Optional; A dictionary to store cached data. Defaults to using In-Memory Storage.

- generate_fallback_cache_key(response)

Generates a unique fallback cache key based on the response URL and status code.

- verify_cache(cache_key)

Checks if the provided cache_key exists in the cache storage.

- cache_is_valid(cache_key, response=None, cached_response=None)

Determines whether the cached data for a given key is still valid.

- update_cache(cache_key, response, store_raw=False, metadata=None, parsed_response=None, processed_records=None)

Updates the cache storage with new data.

- retrieve(cache_key)

Retrieves data from the cache storage based on the cache key.

- retrieve_from_response(response)

Retrieves data from the cache storage based on the response if within cache.

- verify_connection()

Verifies that a connection can be established using the current cache configuration.

Examples

>>> from scholar_flux.data_storage import DataCacheManager
>>> from scholar_flux.api import SearchCoordinator
# Factory method that creates a default redis connection to the service on localhost if available.
>>> redis_cache_manager = DataCacheManager.with_storage('redis')
# Creates a search coordinator for retrieving API responses from the PLOS API provider
>>> search_coordinator = SearchCoordinator(query = 'Computational Caching Strategies',
                                           provider_name='plos',
                                           cache_requests = True, # caches raw requests prior to processing
                                           cache_manager=redis_cache_manager) # caches response processing
# Uses the cache manager to temporarily store cached responses for the default duration
>>> processed_response = search_coordinator.search(page = 1)
# On the next search, the processed response data can be retrieved directly for later response reconstruction
>>> retrieved_response_json = search_coordinator.responses.cache.retrieve(processed_response.cache_key)
# Serialized responses store the core response fields (content, URL, status code) associated with API responses
>>> assert isinstance(retrieved_response_json, dict) and 'serialized_response' in retrieved_response_json
__init__(cache_storage: ABCStorage | None = None, **storage_kwargs: Any) None[source]

Initializes the DataCacheManager with the selected cache storage.

Parameters:
  • cache_storage (Optional[ABCStorage]) – An already-instantiated storage backend. If None, creates a default storage.

  • **storage_kwargs – Keyword arguments passed to the default storage backend constructor when cache_storage is None. Common parameters include: - verify_connection (bool): Verify storage availability on initialization - namespace (str): Prefix for cache keys - ttl (int): Time-to-live for cache entries - raise_on_error (bool): Whether to raise exceptions on cache errors

classmethod cache_fingerprint(obj: str | Any | None = None, package_version: str | None = '0.5.0') str[source]

Generates a unique string to identify an object’s structure and configuration for later cache retrieval.

By default, a fingerprint is generated from the current package version and object representation, if provided. If otherwise not provided, a new human-readable object representation is generated using the scholar_flux.utils.generate_repr helper function that represents the object name and its current state. The package version is also prepended to the current finger-print if enabled (not None), and can be customized if needed for object-specific versioning.

Parameters:
  • obj (Optional[str]) – A finger-printed object, or an object to generate a representation of

  • package_version (Optional[str]) – The current package version string or manually provided version for a component).

Returns:

A human-readable string including the version, object identity

Return type:

str

cache_is_valid(cache_key: str, response: Response | ResponseProtocol | None = None, cached_response: Dict[str, Any] | None = None) bool[source]

Determines whether the cached data for a cache key is valid or needs reprocessing due to missing fields.

To verify the freshness of a cached response, the content hash is compared against a fresh response if available. Checks for validity are also performed to determine whether the cache key recorded within the metadata matches the currently provided key and whether other core fields haven’t changed.

If a cached_response dictionary was not directly passed, the cache key will be retrieved from storage before comparison.

Parameters:
  • cache_key (str) – The unique identifier for cached data.

  • response (Optional[Response | ResponseProtocol]) – The API response or response-like object used to validate the cache, if available.

  • cached_response – Optional[Dict[str, Any]]: The cached data associated with the key

Returns:

True if the cache is valid, False otherwise.

Return type:

bool

property cache_storage: ABCStorage

The response cache storage used to store raw response data, processed records, and metadata.

clone() Self[source]

Creates a newly cloned instance of the current DataCacheManager.

property config: dict

The underlying configuration dictionary being used with the current storage device.

classmethod default_cache_storage(raise_on_error: bool = False, **storage_kwargs: Any) ABCStorage[source]

Creates a storage device from SCHOLAR_FLUX_DEFAULT_RESPONSE_CACHE_STORAGE or an In-memory cache otherwise.

This storage device, once created, defines the storage mechanism used by the DataCacheManager to cache processed response data.

Parameters:
  • raise_on_error (bool) – If True, an exception is raised when the environment variable exists but attempts to use an unknown storage device. If False, this method instead logs a warning on errors and creates a default DataCacheManager with an InMemoryStorage device instead.

  • **storage_kwargs – Keyword arguments passed to the storage backend constructor. Common parameters include: - verify_connection (bool): Verify storage availability on initialization - namespace (str): Prefix for cache keys - ttl (int): Time-to-live for cache entries

Returns:

A new, subclassed default storage backend.

Return type:

ABCStorage

delete(cache_key: str) None[source]

Deletes data from the cache storage based on the cache key.

Parameters:

cache_key – A unique identifier for the cached data.

Returns:

The cached data corresponding to the cache key if found, otherwise None.

Return type:

None

classmethod from_defaults(raise_on_error: bool = False, **storage_kwargs: Any) Self[source]

Creates a cache from SCHOLAR_FLUX_DEFAULT_RESPONSE_CACHE_STORAGE or an In-memory cache otherwise.

Parameters:
  • raise_on_error (bool) – If True, an exception is raised when unknown storage types are received. If False, a warning is logged and this method defaults to creating a DataCacheManager using an InMemoryStorage.

  • **storage_kwargs – Keyword arguments passed to the storage backend constructor. Common parameters include: - verify_connection (bool): Verify storage availability on initialization - namespace (str): Prefix for cache keys - ttl (int): Time-to-live for cache entries

Returns:

A new DataCacheManager instance with the default storage backend.

Return type:

Self

classmethod generate_fallback_cache_key(response: Response | ResponseProtocol, use_parameters: bool = True) str[source]

Generates a unique fallback cache key based on the response URL and status code.

Parameters:

response – The API response object.

Returns:

A unique fallback cache key.

Return type:

str

classmethod generate_response_hash(response: Response | ResponseProtocol) str[source]

Generates a hash of the response content.

The hashlib library is used to generate a sha256 sum that returns a consistent hash for the same input.

Parameters:

response – The API response object.

Returns:

A SHA-256 hash of the response content.

Return type:

str

isnull() bool[source]

Helper method for determining whether the current cache manager uses a null storage.

property namespace: str | None

The namespace of the current cache storage device.

classmethod null() Self[source]

Creates a DataCacheManager using a NullStorage (no storage).

This storage device has the effect of returning False when validating whether the current DataCacheManager is in operation or not

Returns:

The current class initialized without storage

Return type:

DataCacheManager

property raise_on_error: bool

Indicates whether errors will be caught or re-raised on failed connections.

retrieve(cache_key: str) Dict[str, Any] | None[source]

Retrieves data from the cache storage based on the cache key.

Parameters:

cache_key – A unique identifier for the cached data.

Returns:

The cached data corresponding to the cache key if found, otherwise None.

Return type:

Optional[Dict[str, Any]]

retrieve_from_response(response: Response | ResponseProtocol) Dict[str, Any] | None[source]

Retrieves data from the cache storage based on the response if within cache.

Parameters:

response – The API response object.

Returns:

The cached data corresponding to the response if found, otherwise None.

Return type:

Optional[Dict[str, Any]]

structure(flatten: bool = False, show_value_attributes: bool = False) str[source]

Helper method for quickly showing a representation of the overall structure of the current DataCacheManager.

The instance uses the generate_repr helper function to produce human-readable representations of the core structure of the storage subclass with its defaults.

Returns:

The structure of the current DataCacheManager as a string.

Return type:

str

property ttl: int | float | None

The time to live associated with the current storage device.

The implementation details depend on the device used to store the response processing cache:

  • RedisStorage: measured in seconds (default=None)

  • MongoDBStorage: measured in seconds (default=None)

  • InMemoryStorage: No-Op (Always returns None)

  • NullStorage: No-Op (Always returns None)

  • SQLAlchemyStorage: No-Op (Always returns None)

  • DuckDBStorage: No-Op (Always returns None)

update_cache(cache_key: str, response: Response | ResponseProtocol, store_raw: bool = False, parsed_response: Any | None = None, metadata: Dict[str, Any] | None = None, extracted_records: Any | None = None, processed_records: Any | None = None, **kwargs: Any) None[source]

Updates the cache storage with data from intermediate and final steps in response retrieval and processing.

Parameters:
  • cache_key – A unique identifier for the cached data.

  • response – (requests.Response | ResponseProtocol) The API response or response-like object.

  • store_raw – (Optional) A boolean indicating whether to store the raw response. Defaults to False.

  • metadata – (Optional) Additional metadata associated with the cached data. Defaults to None.

  • parsed_response – (Optional) The response data parsed into a structured format. Defaults to None.

  • extracted_records – (Optional) The records extracted from a parsed response prior to record processing.

  • processed_records – (Optional) The response data processed for specific use. Defaults to None.

  • kwargs – Optional additional hashable dictionary fields that can be stored using sql cattrs encodings or in-memory cache.

verify_cache(cache_key: str | None) bool[source]

Checks if the provided cache_key exists in the cache storage.

Parameters:

cache_key – A unique identifier for the cached data.

Returns:

True if the cache key exists, False otherwise.

Return type:

bool

verify_connection() None[source]

Verifies that a connection can be established to a cache based on the current cache_storage configuration.

  • InMemoryStorage (No-Op: Always successful)

  • NullStorage (No-Op: Always successful)

  • MongoDBStorage (Tries to verify connectivity via a ping request)

  • RedisStorage (Tries to verify connectivity via a ping request)

  • SQLAlchemyStorage (Verifies that a file-based or remote connection can be established [to SQLite by default])

  • DuckDBStorage (Verifies that a file-based or remote DuckDB/MotherDuck connection can be established)

Raises:

StorageCacheException – When an error occurs during an connection verification with the underlying cache

Note: When successful, nothing is returned. An error is only raised when a connection cannot be established.

classmethod with_storage(cache_storage: Literal['redis', 'sql', 'sqlalchemy', 'duckdb', 'mongodb', 'pymongo', 'inmemory', 'memory', 'null'] | None = None, *args: Any, **kwargs: Any) Self[source]

Creates a DataCacheManager using a known storage device.

This is a convenience function allowing the user to create a DataCacheManager with redis, sql, mongodb, or inmemory storage with default settings or through the use of optional positional and keyword parameters to initialize the storage as needed.

Note that sql is shorthand for the SQLAlchemy cache storage and uses SQLite. Compatible implementations of other storage devices can be used instead via SQLAlchemy as well (e.g. DuckDB).

Parameters:
  • cache_storage (Literal["redis", "sql", "sqlalchemy", "mongodb", "pymongo", "inmemory", "memory", "null"]) – A default ABCStorage subclass implementation to use as a response processing cache.

  • *args – Positional arguments to pass to the chosen ABCStorage subclass constructor.

  • **kwargs – Keyword arguments to pass to the chosen ABCStorage subclass constructor.

Returns:

The current class initialized the chosen storage

Return type:

DataCacheManager

exception scholar_flux.data_storage.DuckDBImportError[source]

Bases: OptionalDependencyImportError

Exception for missing DuckDB engine for SQL Alchemy.

__init__() None[source]

Initializes the duckdb-engine import exception for improved logging before the exception is raised.

class scholar_flux.data_storage.DuckDBStorage(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, verify_connection: bool = False, **sqlalchemy_config: Any)[source]

Bases: SQLAlchemyStorage

This class extends the SQLAlchemyStorage device to support DuckDB as a supported storage device.

Note that this class requires the duckdb_engine and sqlalchemy packages and will raise an error without both being installed. This class can be initialized in the same manner as SQLAlchemy, only requiring that the passed url has a valid duckdb:/// URI scheme.

DEFAULT_CONFIG: Dict[str, Any] = {'echo': False, 'url': <function DuckDBStorage.<lambda>>}
STORAGE_TYPE: str = 'DuckDB'
__init__(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, verify_connection: bool = False, **sqlalchemy_config: Any) None[source]

Initialize the DuckDBStorage storage backend and connect to the server indicated via the url parameter.

This class extends the original SQLAlchemyStorage to provide basic helpers that aid in the creation of both simple and complex sessions using the DuckDB engine.

Parameters:
  • url (Optional[str]) – Database connection string. All URLs must begin with duckdb:///. A CacheParameterValidationException will be raised if the URL is invalid or does not contain the required scheme.

  • namespace (Optional[str]) – The prefix associated with each cache key. By default, this is None.

  • ttl (None) – Ignored. Included for interface compatibility; not implemented.

  • raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with SQLAlchemy. If None, the raise_on_error attribute defaults to SQLAlchemyStorage.DEFAULT_RAISE_ON_ERROR.

  • verify_connection (bool) – If True, verifies the SQL service is available immediately after initialization. Raises StorageCacheException if connection fails. Defaults to False.

  • **sqlalchemy_config

    Additional SQLAlchemy engine/session options passed to sqlalchemy.create_engine Typical parameters include the following:

    • url (str): Indicates what server to connect to. Defaults to sqlite in the package directory.

    • echo (bool): Indicates whether to show the executed SQL queries in the console.

config: dict[str, Any]
classmethod create_default_url() str[source]

Creates a valid DuckDB URL within the default writable package cache directory.

classmethod is_available(url: str | None = None, verbose: bool = True, **kwargs: Any) bool[source]

Tests whether the SQL service can be accessed. If so, this function returns True, otherwise False.

Parameters:
  • url (str) – Indicates the location to attempt a connection

  • verbose (bool) – Indicates whether to log at the levels, DEBUG and lower, or to log warnings only

  • **kwargs – No-Op keyword arguments for compatibility with config connection availability checks

namespace: str | None
raise_on_error: bool
ttl: Any
classmethod verify_url_string(url: str) None[source]

Helper method for verifying that the current URI is a valid DuckDB resource identifier.

class scholar_flux.data_storage.InMemoryStorage(namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **kwargs: Any)[source]

Bases: ABCStorage

Default storage class that implements an in-memory storage cache using a dictionary.

This class implements the required abstract methods from the ABCStorage base class to ensure compatibility with the scholar_flux.DataCacheManager. Methods are provided to delete from the cache, update the cache with new data, and retrieve data from the cache.

Parameters:
  • namespace (Optional[str]) – Prefix for cache keys. Defaults to None.

  • ttl (Optional[int]) – Ignored. Included for interface compatibility; not implemented.

  • **kwargs – Ignored. Included for interface compatibility; not implemented.

Examples

>>> from scholar_flux.data_storage import InMemoryStorage
### defaults to a basic dictionary:
>>> memory_storage = InMemoryStorage(namespace='testing_functionality')
>>> print(memory_storage)
# OUTPUT: InMemoryStorage(...)
### Adding records to the storage
>>> memory_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'})
>>> memory_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'})
### Revising and overwriting a record
>>> memory_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'})
>>> memory_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace
# OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2']
>>> memory_storage.retrieve_all() # Will also be empty
# OUTPUT: {'testing_functionality:record_page_1': {'id': 52,
#           'article': 'A name to remember'},
#          'testing_functionality:record_page_2': {'id': 53,
#           'article': 'A name has many meanings'}}
>>> memory_storage.retrieve('record_page_1') # retrieves the record for page 1
# OUTPUT: {'id': 52, 'article': 'A name to remember'}
>>> memory_storage.delete_all() # deletes all records from the namespace
>>> memory_storage.retrieve_keys() # Will now be empty
>>> memory_storage.retrieve_all() # Will also be empty
DEFAULT_NAMESPACE: str | None = None
DEFAULT_RAISE_ON_ERROR: bool = False
STORAGE_TYPE: str = 'InMemory'
__init__(namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **kwargs: Any) None[source]

Initialize a basic, dictionary-like memory_cache using a namespace.

Note that ttl and **kwargs are provided for interface compatibility, and specifying any of these as arguments will not affect processing or cache initialization.

clone() InMemoryStorage[source]

Helper method for creating a new InMemoryStorage with the same configuration.

config: dict[str, Any]
delete(key: str) bool | None[source]

Attempts to delete the selected cache key if found within the current namespace.

Parameters:

key (str) – The key used associated with the stored data from the dictionary cache.

delete_all() None[source]

Attempts to delete all cache keys found within the current namespace.

classmethod is_available(*args: Any, **kwargs: Any) bool[source]

Helper method that returns True, indicating that dictionary-based storage will always be available.

Returns:

True to indicate that the dictionary-base cache storage will always be available

Return type:

(bool)

namespace: str | None
raise_on_error: bool
retrieve(key: str) Any | None[source]

Attempts to retrieve a response containing the specified cache key within the current namespace.

Parameters:

key (str) – The key used to fetch the stored data from cache.

Returns:

The value returned is deserialized JSON object if successful. Returns None if the key does not exist.

Return type:

Any

retrieve_all() dict[str, Any] | None[source]

Retrieves all cache key-response mappings found within the current namespace.

Returns:

A dictionary containing each key-value mapping for all cached data within the same namespace

Return type:

dict

retrieve_keys() list[str][source]

Retrieves the full list of all cache keys found within the current namespace.

Returns:

The full list of all keys that are currently mapped within the storage

Return type:

list[str]

structure(flatten: bool = False, show_value_attributes: bool = True, mask_values: bool = False) str[source]

Creates a concise string representation of the current InMemoryStorage device.

The representation displays the total number of records that have been registered to avoid overloading the representation with the specifics of what is being cached.

Parameters:
  • flatten (bool) – Flag indicating whether to flatten the string representation of the object into a single line when True or preserve the multiline representation of the storage cache when False (default).

  • show_value_attributes (bool) – Flag for hiding the internal attributes of nested objects when True (arguments replaced with ) and showing their default representation when False (default).

  • mask_values (bool) – Masks any potentially sensitive data shown in the representation when True. This is false by default, as the representation of the InMemoryStorage displays non-sensitive information, including only the namespace of the cache and the total cached record count.

Returns:

A basic string representation of the current object.

ttl: Any
update(key: str, data: Any) None[source]

Attempts to update the data associated with a specific cache key in the namespace.

Parameters:
  • key (str) – The key of the key-value pair

  • data (Any) – The data to be associated with the key

verify_cache(key: str) bool[source]

Verifies whether a cache key exists within the current namespace in the in-memory cache.

Parameters:

key (str) – The key to lookup in the cache

Returns:

True if the key is found otherwise False.

Return type:

bool

verify_connection() None[source]

No-Op that otherwise raises an error when connections can’t be established successfully.

exception scholar_flux.data_storage.MongoDBImportError[source]

Bases: OptionalDependencyImportError

Exception for Mongo Dependency Issues.

__init__() None[source]

Initializes the pymongo import exception for improved logging before the exception is raised.

class scholar_flux.data_storage.MongoDBStorage(host: str | None = None, namespace: str | None = None, ttl: int | float | None = None, raise_on_error: bool | None = None, verify_connection: bool = False, **mongo_config: Any)[source]

Bases: ABCStorage

Implements the storage methods necessary to interact with MongoDB with a unified backend interface.

The MongoDBStorage uses the same underlying interface as other scholar_flux storage classes for use with the DataCacheManager. This implementation is designed to use a key-value store as a cache by which data can be stored and retrieved in a relatively straightforward manner similar to the In-Memory Storage.

Examples

>>> from scholar_flux.data_storage import MongoDBStorage
# Defaults to connecting to locally (mongodb://127.0.0.1) on the default port for MongoDB (27017)
# Verifies that a mongodb service is actually available locally on the default port
>>> assert MongoDBStorage.is_available()
>>> mongo_storage = MongoDBStorage(namespace='testing_functionality')
>>> print(mongo_storage)
# OUTPUT: MongoDBStorage(...)
# Adding records to the storage
>>> mongo_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'})
>>> mongo_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'})
# Revising and overwriting a record
>>> mongo_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'})
>>> mongo_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace
# OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2']
>>> mongo_storage.retrieve_all()
# OUTPUT: {'testing_functionality:record_page_1': {'id': 52,
#           'article': 'A name to remember'},
#          'testing_functionality:record_page_2': {'id': 53,
#           'article': 'A name has many meanings'}}
>>> mongo_storage.retrieve('record_page_1') # retrieves the record for page 1
# OUTPUT: {'id': 52, 'article': 'A name to remember'}
>>> mongo_storage.delete_all() # deletes all records from the namespace
>>> mongo_storage.retrieve_keys() # Will now be empty
>>> mongo_storage.retrieve_all() # Will also be empty
DEFAULT_CONFIG: dict[str, Any] = {'collection': 'result_page', 'db': 'storage_manager_db', 'host': 'mongodb://127.0.0.1', 'port': 27017, 'serverSelectionTimeoutMS': 5000, 'ttl': None}
DEFAULT_NAMESPACE: str | None = None
DEFAULT_RAISE_ON_ERROR: bool = False
STORAGE_TYPE: str = 'MongoDB'
__init__(host: str | None = None, namespace: str | None = None, ttl: int | float | None = None, raise_on_error: bool | None = None, verify_connection: bool = False, **mongo_config: Any) None[source]

Initialize the Mongo DB storage backend and connect to the Mongo DB server.

If no parameters are specified, the MongoDB storage will default to the parameters derived from the scholar_flux.utils.config_settings.config dictionary, which, in turn, resolves the host and port from environment variables or the default MongoDB host/port in the following order of priority:

  • SCHOLAR_FLUX_MONGODB_HOST > MONGODB_HOST > ‘mongodb://127.0.0.1’ (localhost)

  • SCHOLAR_FLUX_MONGODB_PORT > MONGODB_PORT > 27017

Parameters:
  • host (Optional[str]) –

    The host address where the Mongo Database can be found. The default is ‘mongodb://127.0.0.1’, which is the mongo server on the localhost.

    Each of the following are valid values for host:

    • Simple hostname: ‘localhost’ (uses port parameter)

    • Full URI: ‘mongodb://localhost:27017’ (ignores port parameter)

    • Complex URI: ‘mongodb://user:pass@host:27017/db?options’

  • namespace (Optional[str]) – The prefix associated with each cache key. By default, this is None.

  • ttl (Optional[float | int]) –

    The total number of seconds that must elapse for a cached record to expire. The value -1 turns off TTL expiration when directly passed or resolved from config defaults. TTL is determined in the following order of priority:

    • SCHOLAR_FLUX_DEFAULT_RESPONSE_CACHE_TTL (resolved from config_settings.get())

    • MongoDBStorage.DEFAULT_CONFIG.get(‘ttl’) (if available)

    • And None if neither of the above is set or defined.

  • raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with MongoDB. If None, the raise_on_error attribute defaults to MongoDBStorage.DEFAULT_RAISE_ON_ERROR.

  • verify_connection (bool) – If True, verifies the MongoDB service is available immediately after initialization. Raises StorageCacheException if connection fails. Defaults to False.

  • **mongo_config – Configuration parameters required to connect to the Mongo DB server. Typically includes parameters such as host, port, db, etc.

Raises:

MongoDBImportError – If db module is not available or fails to load.

client: None
clone() MongoDBStorage[source]

Helper method for creating a new MongoDBStorage with the same parameters.

Note that the implementation of the MongoClient is not able to be deep copied. This method is provided for convenience for re-instantiation with the same configuration.

config: dict[str, Any]
delete(key: str) bool | None[source]

Delete the value associated with the provided key from cache.

Parameters:

key (str) – The key associated with the stored data from the cache.

Raises:

PyMongoError – If there is an error deleting the record

delete_all() None[source]

Delete all records from cache that match the current namespace prefix.

Raises:

PyMongoError – If an error occurs when deleting records from the collection

classmethod get_default_config() dict[str, Any][source]

Get default configuration with current config_settings values.

Reads from environment variables in order of priority: - SCHOLAR_FLUX_MONGODB_HOST > cls.DEFAULT_CONFIG[‘host’] > MONGODB_HOST > “mongodb://127.0.0.1” (localhost) - SCHOLAR_FLUX_MONGODB_PORT > DEFAULT_CONFIG[‘port’] > MONGODB_PORT > 27017

Returns:

Configuration dictionary with host and port.

Return type:

dict

classmethod is_available(host: str | None = None, port: int | None = None, verbose: bool = True, **kwargs: Any) bool[source]

Helper method that indicates whether the MongoDB service is available or not.

It attempts to establish a connection on the provided host and port and returns a boolean indicating if the connection was successful.

Note that if the input to the host is a URI (e.g. mongodb://localhost:27017), any input provided to the port variable will be ignored when MongoClient initializes the connection and use the URI exclusively.

Parameters:
  • host (Optional[str]) – The IP of the host of the MongoDB service. If None or an empty string, Defaults to localhost (the local computer) or the “host” entry from the class variable, DEFAULT_CONFIG.

  • port (Optional[int]) – The port where the service is hosted. If None or 0, defaults to port, 27017 or the “port” entry from the DEFAULT_CONFIG class variable.

  • verbose (bool) – Indicates whether to log status messages. Defaults to True

  • **kwargs – No-Op keyword arguments for compatibility with config connection availability checks

Returns:

Indicating whether or not the service can be successfully accessed. The value returned is True if successful and False otherwise.

Return type:

bool

Raises:
  • ServerSelectionTimeoutError – If a timeout error occurs when attempting to ping Mongo DB

  • ConnectionFailure – If a connection cannot be established

namespace: str | None
classmethod ping(client: None) None[source]

Attempts to ping the remote service.

raise_on_error: bool
retrieve(key: str) Any | None[source]

Retrieve the value associated with the provided key from cache.

Parameters:

key (str) – The key used to fetch the stored data from cache.

Returns:

The value returned is deserialized JSON object if successful. Returns None if the key does not exist.

Return type:

Any

Raises:

PyMongoError – If there is an error retrieving the record

retrieve_all() dict[str, Any][source]

Retrieve all records from cache that match the current namespace prefix.

Returns:

Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.

Return type:

dict[str, Any]

Raises:

PyMongoError – If there is an error during the retrieval of records under the namespace.

retrieve_keys() list[str][source]

Retrieve all keys for records from cache.

Returns:

A list of all keys saved via MongoDB.

Return type:

list[str]

Raises:

PyMongoError – If there is an error retrieving the record key.

ttl: Any
update(key: str, data: Any) None[source]

Update the cache by storing associated value with provided key.

Parameters:
  • key (str) – The key used to store the data in cache.

  • data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types such as strings, numbers, lists, dictionaries, etc.

Raises:

PyMongoError – If an error occurs when attempting to insert or update a record

verify_cache(key: str) bool[source]

Check if specific cache key exists.

Parameters:

key (str) – The key to check its presence in the Mongo DB storage backend.

Returns:

True if the key is found otherwise False.

Return type:

bool

Raises:
verify_connection() None[source]

Verifies that the MongoDBStorage is available for connection with initialized storage configuration settings.

class scholar_flux.data_storage.NullStorage(namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = None, **kwargs: Any)[source]

Bases: ABCStorage

NullStorage is a no-op implementation of ABCStorage. This class is useful for when you want to disable storage without changing code logic.

The scholar_flux package mainly implements this storage when the user turns off processing cache.

Example

>>> from scholar_flux.data_storage import DataCacheManager, NullStorage
>>> from scholar_flux.api import SearchCoordinator
>>> null_storage = DataCacheManager.null()
## This implements a data cache with the null storage under the hood:
>>> assert isinstance(null_storage.cache_storage, NullStorage)
>>> search_coordinator = SearchCoordinator(query='History of Data Caching', cache_manager=null_storage)
# Otherwise the same can be performed with the following:
>>> search_coordinator = SearchCoordinator(query='History of Data Caching', cache_results = False)
# The processing of responses will then be recomputed on the next search:
>>> response = search_coordinator.search(page = 1)
DEFAULT_NAMESPACE: str | None = None
DEFAULT_RAISE_ON_ERROR: bool = False
STORAGE_TYPE: str = 'Null'
__init__(namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = None, **kwargs: Any) None[source]

Initialize a No-Op cache for compatibility with the ABCStorage base class.

Note that namespace, ttl, raise_on_error, and **kwargs are provided for interface compatibility, and specifying any of these as arguments will not affect initialization.

clone() NullStorage[source]

Helper method for creating a new implementation of the current NullStorage.

config: dict[str, Any]
delete(*args: Any, **kwargs: Any) None[source]

Method added for abstract class consistency - no-op

delete_all(*args: Any, **kwargs: Any) None[source]

Method added for abstract class consistency - no-op

classmethod is_available(*args: Any, **kwargs: Any) bool[source]

Method added for abstract class consistency - returns True, indicating that the no-op storage is always available although no cache is ever stored.

namespace: str | None
raise_on_error: bool
retrieve(*args: Any, **kwargs: Any) Any | None[source]

Method added for abstract class consistency - no-op

retrieve_all(*args: Any, **kwargs: Any) dict[str, Any] | None[source]

Method added for abstract class consistency - returns a dictionary for type consistency

retrieve_keys(*args: Any, **kwargs: Any) list[str] | None[source]

Method added for abstract class consistency - returns a list for type consistency

ttl: Any
update(*args: Any, **kwargs: Any) None[source]

Method added for abstract class consistency - no-op

verify_cache(*args: Any, **kwargs: Any) bool[source]

Method added for abstract class consistency - returns False, indicating that no cache is ever stored

verify_connection() None[source]

No-Op that otherwise raises an error when connections can’t be established successfully.

exception scholar_flux.data_storage.OptionalDependencyImportError(message: str = 'Optional Dependency not found')[source]

Bases: Exception

Base exception for Optional Dependency Issues.

__init__(message: str = 'Optional Dependency not found') None[source]

Initializes the foundational OptionalDependencyImportError that forms the basis of more specific error handling when dependencies are missing.

exception scholar_flux.data_storage.RedisImportError[source]

Bases: OptionalDependencyImportError

Exception for missing redis backend.

__init__() None[source]

Initializes the redis import exception for improved logging before the exception is raised.

class scholar_flux.data_storage.RedisStorage(host: str | None = None, namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, verify_connection: bool = False, **redis_config: Any)[source]

Bases: ABCStorage

Implements the storage methods necessary to interact with Redis using a unified backend interface.

The RedisStorage implements the abstract methods from the ABCStorage class for use with the DataCacheManager. This implementation is designed to use a key-value store as a cache by which data can be stored and retrieved in a relatively straightforward manner similar to the In-Memory Storage.

Examples

>>> from scholar_flux.data_storage import RedisStorage
# Defaults to connecting to locally (localhost) on the default port for Redis services (6379)
# Verifies that a Redis service is locally available.
>>> assert RedisStorage.is_available()
>>> redis_storage = RedisStorage(namespace='testing_functionality')
>>> print(redis_storage)
# OUTPUT: RedisStorage(...)
# Adding records to the storage
>>> redis_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'})
>>> redis_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'})
# Revising and overwriting a record
>>> redis_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'})
>>> redis_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace
# OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2']
>>> redis_storage.retrieve_all() # Will also be empty
# OUTPUT: {'testing_functionality:record_page_1': {'id': 52,
#           'article': 'A name to remember'},
#          'testing_functionality:record_page_2': {'id': 53,
#           'article': 'A name has many meanings'}}
>>> redis_storage.retrieve('record_page_1') # retrieves the record for page 1
# OUTPUT: {'id': 52, 'article': 'A name to remember'}
>>> redis_storage.delete_all() # deletes all records from the namespace
>>> redis_storage.retrieve_keys() # Will now be empty
>>> redis_storage.retrieve_all() # Will also be empty
DEFAULT_CONFIG: dict = {'host': 'localhost', 'port': 6379, 'ttl': None}
DEFAULT_NAMESPACE: str = 'SFAPI'
DEFAULT_RAISE_ON_ERROR: bool = False
STORAGE_TYPE: str = 'Redis'
__init__(host: str | None = None, namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, verify_connection: bool = False, **redis_config: Any)[source]

Initialize the Redis storage backend and connect to the Redis server.

If no parameters are specified, the Redis storage will attempt to resolve the host and port using variables from the environment (loaded into scholar_flux.utils.config_settings at runtime).

The resolved host and port are resolved from environment variables/defaults in the following order of priority:

  • SCHOLAR_FLUX_REDIS_HOST > REDIS_HOST > ‘localhost’

  • SCHOLAR_FLUX_REDIS_PORT > REDIS_PORT > 6379

Parameters:
  • host (Optional[str]) – Redis server host. Can be provided positionally or as a keyword argument. Defaults to ‘localhost’ if not specified.

  • namespace (Optional[str]) – The prefix associated with each cache key. Defaults to DEFAULT_NAMESPACE if left None.

  • ttl (Optional[int]) –

    The total number of seconds that must elapse for a cached record to expire. While integers are the recommended input types, floats and strings that can reasonably be converted into integers will be. Also note: The value -1 turns off TTL expiration when directly passed or resolved from config defaults. TTL is determined in the following order of priority:

    • SCHOLAR_FLUX_DEFAULT_RESPONSE_CACHE_TTL (resolved from config_settings.get())

    • RedisStorage.DEFAULT_CONFIG.get(‘ttl’) (if available)

    • And None if neither of the above is set or defined.

  • raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with Redis. If None, the raise_on_error attribute defaults to RedisStorage.DEFAULT_RAISE_ON_ERROR.

  • verify_connection (bool) – If True, verifies the Redis service is available immediately after initialization. Raises StorageCacheException if connection fails. Defaults to False.

  • **redis_config – Configuration parameters required to connect to the Redis server. Typically includes parameters such as host, port, db, etc.

Raises:

RedisImportError – If redis module is not available or fails to load.

clone() RedisStorage[source]

Helper method for creating a new RedisStorage with the same parameters.

Note that the implementation of the RedisStorage is not able to be deep copied, and this method is provided for convenience in re-instantiation with the same configuration.

config: dict[str, Any]
delete(key: str) bool | None[source]

Delete the value associated with the provided key from cache.

This method indicates whether deletion was successful by returning True if the record was deleted and False if the record did not exist to be deleted.

Parameters:

key (str) – The key used associated with the stored data from cache.

Raises:

RedisError – If there is an error deleting the record

delete_all() None[source]

Delete all records from cache that match the current namespace prefix.

Raises:

RedisError – If an error occurs when deleting records from the collection

classmethod get_default_config() dict[str, Any][source]

Get default configuration with current config_settings values.

Reads from environment variables in order of priority: - SCHOLAR_FLUX_REDIS_HOST > cls.DEFAULT_CONFIG[‘host’] > REDIS_HOST > ‘localhost’ - SCHOLAR_FLUX_REDIS_PORT > DEFAULT_CONFIG[‘port’] > REDIS_PORT > 6379

Returns:

Configuration dictionary with host and port.

Return type:

dict[str, Any]

classmethod is_available(host: str | None = None, port: int | None = None, verbose: bool = True, **kwargs: Any) bool[source]

Helper class method for testing whether the Redis service is available and can be accessed.

If Redis can be successfully reached, this function returns True, otherwise False.

Parameters:
  • host (Optional[str]) – Indicates the location to attempt a connection. If None or an empty string, Defaults to localhost (the local computer) or the “host” entry from the class variable, DEFAULT_CONFIG.

  • port (Optional[int]) – Indicates the port where the service can be accessed If None or 0, Defaults to port 6379 or the “port” entry from the DEFAULT_CONFIG class variable.

  • verbose (bool) – Indicates whether to log at the levels, DEBUG and lower, or to log warnings only

  • **kwargs – No-Op keyword arguments for compatibility with config connection availability checks

Raises:
  • TimeoutError – If a timeout error occurs when attempting to ping Redis

  • ConnectionError – If a connection cannot be established

namespace: str | None
classmethod ping(client: redis.Redis) None[source]

Attempts to ping the remote service.

raise_on_error: bool
retrieve(key: str) Any | None[source]

Retrieve the value associated with the provided key from cache.

Parameters:

key (str) – The key used to fetch the stored data from cache.

Returns:

The value returned is deserialized JSON object if successful. Returns None if the key does not exist.

Return type:

Any

retrieve_all() dict[str, Any][source]

Retrieve all records from cache that match the current namespace prefix.

Returns:

Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.

Return type:

dict[str, Any]

Raises:

RedisError – If there is an error during the retrieval of records under the namespace

retrieve_keys() list[str][source]

Retrieve all keys for records from cache that match the current namespace prefix.

Returns:

A list of all keys saved under the current namespace.

Return type:

list[str]

Raises:

RedisError – If there is an error retrieving the record key

ttl: Any
update(key: str, data: Any) None[source]

Update the cache by storing associated value with provided key.

Parameters:
  • key (str) – The key used to store the serialized JSON string in cache.

  • data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types like strings, numbers, lists, dictionaries, etc.

Raises:

RedisError – If an error occurs when attempting to insert or update a record

verify_cache(key: str) bool[source]

Check if specific cache key exists.

Parameters:

key (str) – The key to check its presence in the Redis storage backend.

Returns:

True if the key is found otherwise False.

Return type:

bool

Raises:
  • ValueError – If provided key is empty or None.

  • RedisError – If an error occurs when looking up a key

verify_connection() None[source]

Verifies that the RedisStorage is available for connection with the initialized configuration settings.

exception scholar_flux.data_storage.SQLAlchemyImportError[source]

Bases: OptionalDependencyImportError

Exception for missing sql alchemy backend.

__init__() None[source]

Initializes the sqlalchemy import exception for improved logging before the exception is raised.

class scholar_flux.data_storage.SQLAlchemyStorage(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, verify_connection: bool = False, **sqlalchemy_config: Any)[source]

Bases: ABCStorage

Implements the storage methods necessary to interact with SQLite3 along with other SQL flavors via sqlalchemy.

This implementation is designed to use a relational database as a cache by which data can be stored and retrieved in a relatively straightforward manner that associates records in key-value pairs similar to the In-Memory Storage.

Note:

This table uses the structure previously defined in the CacheTable to store records in a structured manner:

ID:

Automatically generated - identifies the unique record in the table

Key:

Is used to associate a specific cached record with a short human-readable (or hashed) string

Cache:

The JSON data associated with the record. To store the data, any nested, non-serializable data is first encoded before being unstructured and stored. On retrieving the data, the JSON string is decoded and restructured in order to return the original object.

The SQLAlchemyStorage can be initialized as follows:

### Import the package and initialize the storage in a dedicated package directory : >>> from scholar_flux.data_storage import SQLAlchemyStorage # Defaults to connecting to creating a local, file-based sqlite cache within the default writable directory. # Verifies that the dependency for a basic sqlite service is actually available for use locally >>> assert SQLAlchemyStorage.is_available() >>> sql_storage = SQLAlchemyStorage(namespace=’testing_functionality’) >>> print(sql_storage) # OUTPUT: SQLAlchemyStorage(…) # Adding records to the storage >>> sql_storage.update(‘record_page_1’, {‘id’:52, ‘article’: ‘A name to remember’}) >>> sql_storage.update(‘record_page_2’, {‘id’:55, ‘article’: ‘A name can have many meanings’}) # Revising and overwriting a record >>> sql_storage.update(‘record_page_2’, {‘id’:53, ‘article’: ‘A name has many meanings’}) >>> sql_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace >>> sql_storage.retrieve_all() # OUTPUT: {‘testing_functionality:record_page_1’: {‘id’: 52, # ‘article’: ‘A name to remember’}, # ‘testing_functionality:record_page_2’: {‘id’: 53, # ‘article’: ‘A name has many meanings’}} # OUTPUT: [‘testing_functionality:record_page_1’, ‘testing_functionality:record_page_2’] >>> sql_storage.retrieve(‘record_page_1’) # retrieves the record for page 1 # OUTPUT: {‘id’: 52, ‘article’: ‘A name to remember’} >>> sql_storage.delete_all() # deletes all records from the namespace >>> sql_storage.retrieve_keys() # Will now be empty

DEFAULT_CONFIG: Dict[str, Any] = {'echo': False, 'url': <function SQLAlchemyStorage.<lambda>>}
DEFAULT_NAMESPACE: str | None = None
DEFAULT_RAISE_ON_ERROR: bool = False
STORAGE_TYPE: str = 'SQL'
__init__(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, verify_connection: bool = False, **sqlalchemy_config: Any) None[source]

Initialize the SQLAlchemy storage backend and connect to the server indicated via the url parameter.

This class uses the innate flexibility of SQLAlchemy to support backends such as SQLite, Postgres, DuckDB, etc.

Parameters:
  • url (Optional[str]) – Database connection string. This can be provided positionally or as a keyword argument.

  • namespace (Optional[str]) – The prefix associated with each cache key. By default, this is None.

  • ttl (None) – Ignored. Included for interface compatibility, but not implemented.

  • raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with SQLAlchemy. If None, the raise_on_error attribute defaults to SQLAlchemyStorage.DEFAULT_RAISE_ON_ERROR.

  • verify_connection (bool) – If True, verifies the SQL service is available immediately after initialization. Raises StorageCacheException if connection fails. Defaults to False.

  • **sqlalchemy_config

    Additional SQLAlchemy engine/session options passed to sqlalchemy.create_engine Typical parameters include the following:

    • url (str): Indicates what server to connect to. Defaults to sqlite in the package directory.

    • echo (bool): Indicates whether to show the executed SQL queries in the console.

clone() SQLAlchemyStorage[source]

Helper method for creating a new SQLAlchemyStorage with the same parameters.

Note that the implementation of the SQLAlchemyStorage is not able to be deep copied, and this method is provided for convenience in re-instantiation with the same configuration.

config: dict[str, Any]
classmethod create_default_url() str[source]

Creates a default URL within the writable directory for the current SQLAlchemyStorage class or subclass.

delete(key: str) bool | None[source]

Delete the value associated with the provided key from cache.

Parameters:

key (str) – The key used associated with the stored data from cache.

delete_all() None[source]

Delete all records from cache that match the current namespace prefix.

classmethod get_default_config() dict[str, Any][source]

Get default configuration with current config_settings values.

Returns:

A dictionary configuration with the default URL and echo (for debugging SQL statements).

Return type:

dict

classmethod get_default_url() str[source]

Retrieves the SQLAlchemy URL from the environment configuration, falling back to the default when invalid.

Returns:

The validated URL from the environment configuration if valid. Otherwise the default URL generated via cls.create_default_url().

Return type:

str

Note: This method first attempts to validate the URL string from the environment variable, SCHOLAR_FLUX_SQLALCHEMY_URL, using the cls.verify_url_string class method. When validation fails, the default for the current class is returned via cls.create_default_url instead.

classmethod is_available(url: str | None = None, verbose: bool = True, **kwargs: Any) bool[source]

Tests whether the SQL service can be accessed. If so, this function returns True, otherwise False.

Parameters:
  • url (str) – Indicates the location to attempt a connection

  • verbose (bool) – Indicates whether to log at the levels, DEBUG and lower, or to log warnings only

  • **kwargs – No-Op keyword arguments for compatibility with config connection availability checks

namespace: str | None
classmethod ping(engine: None) None[source]

Verifies that the client can successfully connect to the database.

raise_on_error: bool
retrieve(key: str) Any | None[source]

Retrieve the value associated with the provided key from cache.

Parameters:

key (str) – The key used to fetch the stored data from cache.

Returns:

The value returned is deserialized JSON object if successful. Returns None if the key does not exist.

Return type:

Any

retrieve_all() Dict[str, Any][source]

Retrieve all records from cache.

Returns:

Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.

Return type:

dict

retrieve_keys() List[str][source]

Retrieve all keys for records from cache.

Returns:

A list of all keys saved via SQL.

Return type:

list

ttl: Any
update(key: str, data: Any) None[source]

Update the cache by storing associated value with provided key.

Parameters:
  • key (str) – The key used to store the serialized JSON string in cache.

  • data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types like strings, numbers, lists, dictionaries, etc.

verify_cache(key: str) bool[source]

Check if specific cache key exists.

Parameters:

key (str) – The key to check its presence in the SQL storage backend.

Returns:

True if the key is found otherwise False.

Return type:

bool

Raises:

ValueError – If provided key is empty or None.

verify_connection() None[source]

Verifies that the SQLAlchemyStorage is available for connection with initialized configuration settings.

classmethod verify_url_string(url: str) None[source]

Helper method for verifying that the current URI has a valid SQLAlchemy resource identifier.