scholar_flux.data_storage package
Submodules
scholar_flux.data_storage.abc_storage module
The scholar_flux.data_storage.abc_storage module implements the ABCStorage that defines the abstractions that are to be implemented to create a scholar_flux compatible storage. The ABCStorage defines basic CRUD operations and convenience methods used to perform operations on the entire range of cached records, or optionally, cached records specific to a namespace.
scholar_flux implements the ABCStorage with subclasses for SQLite (through SQLAlchemy), Redis, MongoDB, and In-Memory cache and can be further extended to duckdb and other abstractions supported by SQLAlchemy.
- class scholar_flux.data_storage.abc_storage.ABCStorage(*args, **kwargs)[source]
Bases:
ABCThe ABCStorage class provides the basic structure required to implement the data storage cache with customized backend.
This subclass provides methods to check the cache, delete from the cache, update the cache with new data, and retrieve data from the cache storage.
- abstract clone() Self[source]
Helper method for cloning the structure and configuration of future implementations.
- abstract delete_all(*args, **kwargs) None[source]
Core method for deleting all pages of records from the cache.
- abstract classmethod is_available(*args, **kwargs) bool[source]
Core method for verifying whether a storage/service is available.
- abstract retrieve(*args, **kwargs) Any | None[source]
Core method for retrieving a page of records from the cache.
- abstract retrieve_all(*args, **kwargs) Dict[str, Any] | None[source]
Core method for retrieving all pages of records from the cache.
- abstract retrieve_keys(*args, **kwargs) List[str] | None[source]
Core method for retrieving all keys from the cache.
- structure(flatten: bool = False, show_value_attributes: bool = True) str[source]
Helper method for quickly showing a representation of the overall structure of the current storage subclass. The instance uses the generate_repr helper function to produce human-readable representations of the core structure of the storage subclass with its defaults.
- Returns:
The structure of the current storage subclass as a string.
- Return type:
str
- abstract verify_cache(*args, **kwargs) bool[source]
Core method for verifying the cache based on the key.
- with_raise_on_error(value: bool = True)[source]
Uses a context manager to temporarily modify the raise_on_error attribute for the context duration.
All storage backends that inherit from the ABCStorage will also inherit the with_raise_on_error context manager. When used, this context manager temporarily sets the raise_on_error attribute to True or False for the duration of a code block without permanently changing the storage subclass’s configuration.
This context manager is most useful for briefly suppressing errors and in cache verification when errors need to be logged and reported instead of silently indicating that a cache entry couldn’t be found.
- Parameters:
value (bool) – A value to temporarily assign to raise_on_error for the context duration
Example
>>> with storage.with_raise_on_error(True): >>> # Any storage operation here will raise on error, regardless of the instance default >>> storage.retrieve(key)
scholar_flux.data_storage.data_cache_manager module
The scholar_flux.data_storage.data_cache_manager implements a DataCacheManager that allows the storage and cached retrieval of processed responses.
This class is the user-interface that implements a unified interface for different cache storage devices that inherit from the ABCStorage class.
- class scholar_flux.data_storage.data_cache_manager.DataCacheManager(cache_storage: ABCStorage | None = None)[source]
Bases:
objectDataCacheManager class manages caching of API responses.
This class provides methods to generate cache keys, verify cache entries, check cache validity, update cache with new data, and retrieve data from the cache storage.
- Parameters:
cache_storage (-) – Optional; A dictionary to store cached data. Defaults to using In-Memory Storage .
- - generate_fallback_cache_key(response)
Generates a unique fallback cache key based on the response URL and status code.
- - verify_cache(cache_key)
Checks if the provided cache_key exists in the cache storage.
- - cache_is_valid(cache_key, response=None, cached_response=None)
Determines whether the cached data for a given key is still valid.
- - update_cache(cache_key, response, store_raw=False, metadata=None, parsed_response=None, processed_records=None)
Updates the cache storage with new data.
- - retrieve(cache_key)
Retrieves data from the cache storage based on the cache key.
- - retrieve_from_response(response)
Retrieves data from the cache storage based on the response if within cache.
Examples
>>> from scholar_flux.data_storage import DataCacheManager >>> from scholar_flux.api import SearchCoordinator # Factory method that creates a default redis connection to the service on localhost if available. >>> redis_cache_manager = DataCacheManager.with_storage('redis') # Creates a search coordinator for retrieving API responses from the PLOS API provider >>> search_coordinator = SearchCoordinator(query = 'Computational Caching Strategies', provider_name='plos', cache_requests = True, # caches raw requests prior to processing cache_manager=redis_cache_manager) # caches response processing # Uses the cache manager to temporarily store cached responses for the default duration >>> processed_response = search_coordinator.search(page = 1) # On the next search, the processed response data can be retrieved directly for later response reconstruction >>> retrieved_response_json = search_coordinator.responses.cache.retrieve(processed_response.cache_key) # Serialized responses store the core response fields (content, URL, status code) associated with API responses >>> assert isinstance(retrieved_response_json, dict) and 'serialized_response' in retrieved_response_json
- __init__(cache_storage: ABCStorage | None = None) None[source]
Initializes the DataCacheManager with the selected cache storage.
- classmethod cache_fingerprint(obj: str | Any | None = None, package_version: str | None = '0.1.5') str[source]
This method helps identify changes in class/configuration for later cache retrieval. It generates a unique string based on the object and the package version.
By default, a fingerprint is generated from the current package version and object representation, if provided. If otherwise not provided, a new human-readable object representation is generated using the scholar_flux.utils.generate_repr helper function that represents the object name and its current state. A package version is also prepended to the current finger-print if enabled (not None), and can be customized if needed for object-specific versioning.
- Parameters:
obj (Optional[str]) – A finger-printed object, or an object to generate a representation of
package_version (Optional[str]) – The current package version string or manually provided version for a component).
- Returns:
A human-readable string including the version, object identity
- Return type:
str
- cache_is_valid(cache_key: str, response: Response | ResponseProtocol | None = None, cached_response: Dict[str, Any] | None = None) bool[source]
Determines whether the cached data for a given key is still valid or needs reprocessing due to missing fields or modified content when checked against the current response.
If a cached_response dictionary was not directly passed, the cache key will be retrieved from storage before comparison.
- Parameters:
cache_key (str) – The unique identifier for cached data.
response (Optional[Response | ResponseProtocol]) – The API response or response-like object used to validate the cache, if available.
cached_response – Optional[Dict[str, Any]]: The cached data associated with the key
- Returns:
True if the cache is valid, False otherwise.
- Return type:
bool
- clone() DataCacheManager[source]
Helper method for creating a newly cloned instance of the current DataCacheManager.
- delete(cache_key: str) None[source]
Deletes data from the cache storage based on the cache key.
- Parameters:
cache_key – A unique identifier for the cached data.
- Returns:
The cached data corresponding to the cache key if found, otherwise None.
- Return type:
None
- classmethod generate_fallback_cache_key(response: Response | ResponseProtocol) str[source]
Generates a unique fallback cache key based on the response URL and status code.
- Parameters:
response – The API response object.
- Returns:
A unique fallback cache key.
- Return type:
str
- classmethod generate_response_hash(response: Response | ResponseProtocol) str[source]
Generates a hash of the response content.
- Parameters:
response – The API response object.
- Returns:
A SHA-256 hash of the response content.
- Return type:
str
- isnull() bool[source]
Helper method for determining whether the current cache manager uses a null storage.
- classmethod null() DataCacheManager[source]
Creates a DataCacheManager using a NullStorage (no storage.
This storage device has the effect of returning False when validating whether the current DataCacheManager is in operation or not
- Returns:
The current class initialized without storage
- Return type:
- retrieve(cache_key: str) Dict[str, Any] | None[source]
Retrieves data from the cache storage based on the cache key.
- Parameters:
cache_key – A unique identifier for the cached data.
- Returns:
The cached data corresponding to the cache key if found, otherwise None.
- Return type:
Optional[Dict[str, Any]]
- retrieve_from_response(response: Response | ResponseProtocol) Dict[str, Any] | None[source]
Retrieves data from the cache storage based on the response if within cache.
- Parameters:
response – The API response object.
- Returns:
The cached data corresponding to the response if found, otherwise None.
- Return type:
Optional[Dict[str, Any]]
- structure(flatten: bool = False, show_value_attributes: bool = False) str[source]
Helper method for quickly showing a representation of the overall structure of the current DataCacheManager. The instance uses the generate_repr helper function to produce human-readable representations of the core structure of the storage subclass with its defaults.
- Returns:
The structure of the current DataCacheManager as a string.
- Return type:
str
- update_cache(cache_key: str, response: Response | ResponseProtocol, store_raw: bool = False, parsed_response: Any | None = None, metadata: Dict[str, Any] | None = None, extracted_records: Any | None = None, processed_records: Any | None = None, **kwargs) None[source]
Updates the cache storage with new data.
- Parameters:
cache_key – A unique identifier for the cached data.
response – (requests.Response | ResponseProtocol) The API response or response-like object.
store_raw – (Optional) A boolean indicating whether to store the raw response. Defaults to False.
metadata – (Optional) Additional metadata associated with the cached data. Defaults to None.
parsed_response – (Optional) The response data parsed into a structured format. Defaults to None.
processed_records – (Optional) The response data processed for specific use. Defaults to None.
kwargs – Optional additional hashable dictionary fields that can be stored using sql cattrs encodings or in-memory cache.
- verify_cache(cache_key: str | None) bool[source]
Checks if the provided cache_key exists in the cache storage.
- Parameters:
cache_key – A unique identifier for the cached data.
- Returns:
True if the cache key exists, False otherwise.
- Return type:
bool
- classmethod with_storage(cache_storage: Literal['redis', 'sql', 'sqlalchemy', 'mongodb', 'pymongo', 'inmemory', 'memory', 'null'] | None = None, *args, **kwargs) DataCacheManager[source]
Creates a DataCacheManager using a known storage device.
This is a convenience function allowing the user to create a DataCacheManager with redis, sql, mongodb, or inmemory storage with default settings or through the use of optional positional and keyword parameters to initialize the storage as needed. :returns: The current class initialized the chosen storage :rtype: DataCacheManager
scholar_flux.data_storage.in_memory_storage module
The scholar_flux.data_storage.in_memory_storage module implements an InMemoryStorage class that implements a basic cache storage with an in-memory dictionary.
The InMemoryStorage class implements the basic CRUD operations and convenience methods used to perform operations.
- class scholar_flux.data_storage.in_memory_storage.InMemoryStorage(namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **kwargs)[source]
Bases:
ABCStorageDefault storage class that implements an in-memory storage cache using a dictionary.
This class implements the required abstract methods from the ABCStorage base class to ensure compatibility with the scholar_flux.DataCacheManager. Methods are provided to delete from the cache, update the cache with new data, and retrieve data from the cache.
- Parameters:
namespace (Optional[str]) – Prefix for cache keys. Defaults to None.
ttl (Optional[int]) – Ignored. Included for interface compatibility; not implemented.
**kwargs (Dict) – Ignored. Included for interface compatibility; not implemented.
Examples
>>> from scholar_flux.data_storage import InMemoryStorage ### defaults to a basic dictionary: >>> memory_storage = InMemoryStorage(namespace='testing_functionality') >>> print(memory_storage) # OUTPUT: InMemoryStorage(...) ### Adding records to the storage >>> memory_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> memory_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) ### Revising and overwriting a record >>> memory_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> memory_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> memory_storage.retrieve_all() # Will also be empty # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} >>> memory_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> memory_storage.delete_all() # deletes all records from the namespace >>> memory_storage.retrieve_keys() # Will now be empty >>> memory_storage.retrieve_all() # Will also be empty
- DEFAULT_NAMESPACE: str | None = None
- DEFAULT_RAISE_ON_ERROR: bool = False
- __init__(namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **kwargs) None[source]
Initialize a basic, dictionary-like memory_cache using a namespace.
Note that ttl and **kwargs are provided for interface compatibility, and specifying any of these as arguments will not affect processing or cache initialization.
- clone() InMemoryStorage[source]
Helper method for creating a new InMemoryStorage with the same configuration.
- delete(key: str) None[source]
Attempts to delete the selected cache key if found within the current namespace.
- Parameters:
key (str) – The key used associated with the stored data from the dictionary cache.
- classmethod is_available(*args, **kwargs) bool[source]
Helper method that returns True, indicating that dictionary-based storage will always be available.
- Returns:
True to indicate that the dictionary-base cache storage will always be available
- Return type:
(bool)
- namespace: str | None
- raise_on_error: bool
- retrieve(key: str) Any | None[source]
Attempts to retrieve a response containing the specified cache key within the current namespace.
- Parameters:
key (str) – The key used to fetch the stored data from cache.
- Returns:
The value returned is deserialized JSON object if successful. Returns None if the key does not exist.
- Return type:
Any
- retrieve_all() Dict[str, Any] | None[source]
Retrieves all cache key-response mappings found within the current namespace.
- Returns:
A dictionary containing each key-value mapping for all cached data within the same namespace
- retrieve_keys() List[str] | None[source]
Retrieves the full list of all cache keys found within the current namespace.
- Returns:
The full list of all keys that are currently mapped within the storage
- Return type:
List[str]
- structure(flatten: bool = False, show_value_attributes: bool = True) str[source]
Helper method for creating an in-memory cache without overloading the representation with the specifics of what is being cached.
scholar_flux.data_storage.mongodb_storage module
The scholar_flux.data_storage.mongodb_storage module implements the MongoDBStorage backend for the DataCacheManager.
This class implements the abstract methods required for compatibility with the scholar_flux.DataCacheManager to ensure that each method can be injected as a dependency.
This class implements caching by using the prebuilt features available in MongoDB to store ProcessedResponse fields within the database for later CRUD operations.
- class scholar_flux.data_storage.mongodb_storage.MongoDBStorage(host: str | None = None, namespace: str | None = None, ttl: int | float | None = None, raise_on_error: bool | None = None, **mongo_config)[source]
Bases:
ABCStorageImplements the storage methods necessary to interact with MongoDB with a unified backend interface.
The MongoDBStorage uses the same underlying interface as other scholar_flux storage classes for use with the DataCacheManager. This implementation is designed to use a key-value store as a cache by which data can be stored and retrieved in a relatively straightforward manner similar to the In-Memory Storage.
Examples
>>> from scholar_flux.data_storage import MongoDBStorage # Defaults to connecting to locally (mongodb://127.0.0.1) on the default port for MongoDB (27017) # Verifies that a mongodb service is actually available locally on the default port >>> assert MongoDBStorage.is_available() >>> mongo_storage = MongoDBStorage(namespace='testing_functionality') >>> print(mongo_storage) # OUTPUT: MongoDBStorage(...) # Adding records to the storage >>> mongo_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> mongo_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) # Revising and overwriting a record >>> mongo_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> mongo_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> mongo_storage.retrieve_all() # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} >>> mongo_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> mongo_storage.delete_all() # deletes all records from the namespace >>> mongo_storage.retrieve_keys() # Will now be empty >>> mongo_storage.retrieve_all() # Will also be empty
- DEFAULT_CONFIG: Dict[str, Any] = {'collection': 'result_page', 'db': 'storage_manager_db', 'host': 'mongodb://127.0.0.1', 'port': 27017}
- DEFAULT_NAMESPACE: str | None = None
- DEFAULT_RAISE_ON_ERROR: bool = False
- __init__(host: str | None = None, namespace: str | None = None, ttl: int | float | None = None, raise_on_error: bool | None = None, **mongo_config)[source]
Initialize the Mongo DB storage backend and connect to the Mongo DB server.
If no parameters are specified, the MongoDB storage will default to the parameters derived from the scholar_flux.utils.config_settings.config dictionary, which, in turn, resolves the host and port from environment variables or the default MongoDB host/port in the following order of priority:
SCHOLAR_FLUX_MONGODB_HOST > MONGODB_HOST > ‘mongodb://127.0.0.1’ (localhost)
SCHOLAR_FLUX_MONGODB_PORT > MONGODB_PORT > 27017
- Parameters:
host (Optional[str]) –
The host address where the Mongo Database can be found. The default is ‘mongodb://127.0.0.1’, which is the mongo server on the localhost.
Each of the following are valid values for host:
Simple hostname: ‘localhost’ (uses port parameter)
Full URI: ‘mongodb://localhost:27017’ (ignores port parameter)
Complex URI: ‘mongodb://user:pass@host:27017/db?options’
namespace (Optional[str]) – The prefix associated with each cache key. By default, this is None.
ttl (Optional[float | int]) – The total number of seconds that must elapse for a cache record
raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with MongoDB. If None, the raise_on_error attribute defaults to MongoDBStorage.DEFAULT_RAISE_ON_ERROR.
**mongo_config (Dict[Any, Any]) – Configuration parameters required to connect to the Mongo DB server. Typically includes parameters such as host, port, db, etc.
- Raises:
MongoDBImportError – If db module is not available or fails to load.
- client: None
- clone() MongoDBStorage[source]
Helper method for creating a new MongoDBStorage with the same parameters.
Note that the implementation of the MongoClient is not able to be deep copied. This method is provided for convenience for re-instantiation with the same configuration.
- delete(key: str)[source]
Delete the value associated with the provided key from cache.
- Parameters:
key (str) – The key used associated with the stored data from the cache.
- Raises:
PyMongoError – If there is an error deleting the record
- delete_all()[source]
Delete all records from cache that match the current namespace prefix.
- Raises:
PyMongoError – If there an error occurred when deleting records from the collection
- classmethod is_available(host: str | None = None, port: int | None = None, verbose: bool = True) bool[source]
Helper method that indicates whether the MongoDB service is available or not.
It attempts to establish a connection on the provided host and port and returns a boolean indicating if the connection was successful.
Note that if the input to the host is a URI (e.g. mongodb://localhost:27017), any input provided to the port variable will be ignored when MongoClient initializes the connection and use the URI exclusively.
- Parameters:
host (Optional[str]) – The IP of the host of the MongoDB service. If None or an empty string, Defaults to localhost (the local computer) or the “host” entry from the class variable, DEFAULT_CONFIG.
port (Optional[int]) – The port where the service is hosted. If None or 0, defaults to port, 27017 or the “port” entry from the DEFAULT_CONFIG class variable.
verbose (bool) – Indicates whether to log status messages. Defaults to True
- Returns:
Indicating whether or not the service was be successfully accessed. The value returned is True if successful and False otherwise.
- Return type:
bool
- Raises:
ServerSelectionTimeoutError – If a timeout error occurs when attempting to ping Mongo DB
ConnectionFailure – If a connection cannot be established
- namespace: str | None
- raise_on_error: bool
- retrieve(key: str) Any | None[source]
Retrieve the value associated with the provided key from cache.
- Parameters:
key (str) – The key used to fetch the stored data from cache.
- Returns:
The value returned is deserialized JSON object if successful. Returns None if the key does not exist.
- Return type:
Any
- Raises:
PyMongoError – If there is an error retrieving the record
- retrieve_all() Dict[str, Any][source]
Retrieve all records from cache that match the current namespace prefix.
- Returns:
Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.
- Return type:
dict
- Raises:
PyMongoError – If there is an error during the retrieval of records under the namespace.
- retrieve_keys() List[str][source]
Retrieve all keys for records from cache.
- Returns:
A list of all keys saved via MongoDB.
- Return type:
list[str]
- Raises:
PyMongoError – If there is an error retrieving the record key.
- update(key: str, data: Any)[source]
Update the cache by storing associated value with provided key.
- Parameters:
key (str) – The key used to store the data in cache.
data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types such as strings, numbers, lists, dictionaries, etc.
- Raises:
PyMongoError – If an error occur when attempting to insert or update a record
- verify_cache(key: str) bool[source]
Check if specific cache key exists.
- Parameters:
key (str) – The key to check its presence in the Mongo DB storage backend.
- Returns:
True if the key is found otherwise False.
- Return type:
bool
- Raises:
ValueError – If provided key is empty or None.
CacheVerificationException – If an error occurs on data retrieval
scholar_flux.data_storage.null_storage module
The scholar_flux.data_storage.null_storage module implements a Null (No-Op) Storage that is used to ensure that responses are always reprocessed when implemented.
- class scholar_flux.data_storage.null_storage.NullStorage(namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = None, **kwargs)[source]
Bases:
ABCStorageNullStorage is a no-op implementation of ABCStorage. This class is useful for when you want to disable storage without changing code logic.
The scholar_flux package mainly implements this storage when the user turns off processing cache.
Example
>>> from scholar_flux.data_storage import DataCacheManager, NullStorage >>> from scholar_flux.api import SearchCoordinator >>> null_storage = DataCacheManager.null() ## This implements a data cache with the null storage under the hood: >>> assert isinstance(null_storage.cache_storage, NullStorage) >>> search_coordinator = SearchCoordinator(query='History of Data Caching', cache_manager=null_storage) # Otherwise the same can be performed with the following: >>> search_coordinator = SearchCoordinator(query='History of Data Caching', cache_results = False) # The processing of responses will then be recomputed on the next search: >>> response = search_coordinator.search(page = 1)
- DEFAULT_NAMESPACE: str | None = None
- DEFAULT_RAISE_ON_ERROR: bool = False
- __init__(namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = None, **kwargs) None[source]
Initialize a No-Op cache for compatibility with the ABCStorage base class.
Note that namespace, ttl, raise_on_error, and **kwargs are provided for interface compatibility, and specifying any of these as arguments will not affect initialization.
- clone() NullStorage[source]
Helper method for creating a new implementation of the current NullStorage.
- classmethod is_available(*args, **kwargs) bool[source]
Method added for abstract class consistency - returns, True indicating that the no-op storage is always available although no cache is ever stored.
- namespace: str | None
- raise_on_error: bool
- retrieve_all(*args, **kwargs) Dict[str, Any] | None[source]
Method added for abstract class consistency - returns a dictionary for type consistency
scholar_flux.data_storage.redis_storage module
The scholar_flux.data_storage.redis_storage module implements the RedisStorage backend for the DataCacheManager.
This class implements the abstract methods required for compatibility with the scholar_flux.DataCacheManager.
This class implements caching by using the serialization-deserialization and caching features available in Redis to store ProcessedResponse fields within the database for later CRUD operations.
WARNING: Ensure that the ‘namespace’ parameter is set to a non-empty, unique value for each logical cache. Using an empty or shared namespace may result in accidental deletion or overwriting of unrelated data. For that reason, the delete_all method does not perform any deletions unless a namespace exists
- class scholar_flux.data_storage.redis_storage.RedisStorage(host: str | None = None, namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **redis_config)[source]
Bases:
ABCStorageImplements the storage methods necessary to interact with Redis using a unified backend interface.
The RedisStorage implements the abstract methods from the ABCStorage class for use with the DataCacheManager. This implementation is designed to use a key-value store as a cache by which data can be stored and retrieved in a relatively straightforward manner similar to the In-Memory Storage.
Examples
>>> from scholar_flux.data_storage import RedisStorage # Defaults to connecting to locally (localhost) on the default port for Redis services (6379) # Verifies that a Redis service is locally available. >>> assert RedisStorage.is_available() >>> redis_storage = RedisStorage(namespace='testing_functionality') >>> print(redis_storage) # OUTPUT: RedisStorage(...) # Adding records to the storage >>> redis_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> redis_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) # Revising and overwriting a record >>> redis_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> redis_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> redis_storage.retrieve_all() # Will also be empty # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} >>> redis_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> redis_storage.delete_all() # deletes all records from the namespace >>> redis_storage.retrieve_keys() # Will now be empty >>> redis_storage.retrieve_all() # Will also be empty
- DEFAULT_CONFIG: dict = {'host': 'localhost', 'port': 6379}
- DEFAULT_NAMESPACE: str = 'SFAPI'
- DEFAULT_RAISE_ON_ERROR: bool = False
- __init__(host: str | None = None, namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **redis_config)[source]
Initialize the Redis storage backend and connect to the Redis server.
If no parameters are specified, the Redis storage will attempt to resolve the host and port using variables from the environment (loaded into scholar_flux.utils.config_settings at runtime).
The resolved host and port are resolved from environment variables/defaults in the following order of priority:
SCHOLAR_FLUX_REDIS_HOST > REDIS_HOST > ‘localhost’
SCHOLAR_FLUX_REDIS_PORT > REDIS_PORT > 6379
- Parameters:
host (Optional[str]) – Redis server host. Can be provided positionally or as a keyword argument. Defaults to ‘localhost’ if not specified.
namespace (Optional[str]) – The prefix associated with each cache key. Defaults to DEFAULT_NAMESPACE if left None.
ttl (Optional[int]) – The total number of seconds that must elapse for a cache record to expire. If not provided, ttl defaults to None.
raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with Redis. If None, the raise_on_error attribute defaults to RedisStorage.DEFAULT_RAISE_ON_ERROR.
**redis_config (Optional[Dict[Any, Any]]) – Configuration parameters required to connect to the Redis server. Typically includes parameters such as host, port, db, etc.
- Raises:
RedisImportError – If redis module is not available or fails to load.
- clone() RedisStorage[source]
Helper method for creating a new RedisStorage with the same parameters.
Note that the implementation of the RedisStorage is not able to be deep copied, and this method is provided for convenience in re-instantiation with the same configuration.
- config: dict
- delete(key: str) None[source]
Delete the value associated with the provided key from cache.
- Parameters:
key (str) – The key used associated with the stored data from cache.
- Raises:
RedisError – If there is an error deleting the record
- delete_all() None[source]
Delete all records from cache that match the current namespace prefix.
- Raises:
RedisError – If there an error occurred when deleting records from the collection
- classmethod is_available(host: str | None = None, port: int | None = None, verbose: bool = True) bool[source]
Helper class method for testing whether the Redis service is available and can be accessed.
If Redis can be successfully reached, this function returns True, otherwise False.
- Parameters:
host (Optional[str]) – Indicates the location to attempt a connection. If None or an empty string, Defaults to localhost (the local computer) or the “host” entry from the class variable, DEFAULT_CONFIG.
port (Optional[int]) – Indicates the port where the service can be accessed If None or 0, Defaults to port 6379 or the “port” entry from the DEFAULT_CONFIG class variable.
verbose (bool) – Indicates whether to log at the levels, DEBUG and lower, or to log warnings only
- Raises:
TimeoutError – If a timeout error occurs when attempting to ping Redis
ConnectionError – If a connection cannot be established
- namespace: str | None
- raise_on_error: bool
- retrieve(key: str) Any | None[source]
Retrieve the value associated with the provided key from cache.
- Parameters:
key (str) – The key used to fetch the stored data from cache.
- Returns:
The value returned is deserialized JSON object if successful. Returns None if the key does not exist.
- Return type:
Any
- retrieve_all() Dict[str, Any][source]
Retrieve all records from cache that match the current namespace prefix.
- Returns:
Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.
- Return type:
dict
- Raises:
RedisError – If there is an error during the retrieval of records under the namespace
- retrieve_keys() List[str][source]
Retrieve all keys for records from cache that match the current namespace prefix.
- Returns:
A list of all keys saved under the current namespace.
- Return type:
list
- Raises:
RedisError – If there is an error retrieving the record key
- update(key: str, data: Any) None[source]
Update the cache by storing associated value with provided key.
- Parameters:
key (str) – The key used to store the serialized JSON string in cache.
data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types like strings, numbers, lists, dictionaries, etc.
- Raises:
Redis – If an error occur when attempting to insert or update a record
- verify_cache(key: str) bool[source]
Check if specific cache key exists.
- Parameters:
key (str) – The key to check its presence in the Redis storage backend.
- Returns:
True if the key is found otherwise False.
- Return type:
bool
- Raises:
ValueError – If provided key is empty or None.
RedisError – If an error occurs when looking up a key
scholar_flux.data_storage.sql_storage module
The scholar_flux.data_storage.sql_storage module implements the SQLAlchemyStorage class that implements the abstract methods required for compatibility with the DataCacheManager in the scholar_flux package.
This class implements caching by recording each of the fields of a ProcessedResponse into and parsed fields into a recursively encoded and serialized JSON data structure. When retrieving the data, the data is then decoded and deserialized to return the original object.
- Classes:
- CacheTable:
Defines the internal specification of the SQLAlchemy table that is used under the hood. This class inherits from Base/DeclarativeBase subclass to define its structure and function as a SQLAlchemy table
- SQLCacheStorage:
Inherits from the scholar_flux.data_storage.abc_storage subclass and Defines the mechanisms by which the storage uses SQLAlchemy to load, retrieve, and update, and delete data.
- class scholar_flux.data_storage.sql_storage.SQLAlchemyStorage(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, **sqlalchemy_config)[source]
Bases:
ABCStorageImplements the storage methods necessary to interact with SQLite3 in addition to other SQL flavors via sqlalchemy. This implementation is designed to use a relational database as a cache by which data can be stored and retrieved in a relatively straightforward manner that associates records in key-value pairs similar to the In-Memory Storage.
Note:
This table uses the structure previously defined in the CacheTable to store records in a structured manner:
- ID:
Automatically generated - identifies the unique record in the table
- Key:
Is used to associate a specific cached record with a short human-readable (or hashed) string
- Cache:
The JSON data associated with the record. To store the data, any nested, non-serializable data is first encoded before being unstructured and stored. On retrieving the data, the JSON string is decoded and restructured in order to return the original object.
The SQLAlchemyStorage can be initialized as follows:
### Import the package and initialize the storage in a dedicated package directory : >>> from scholar_flux.data_storage import SQLAlchemyStorage # Defaults to connecting to creating a local, file-based sqlite cache within the default writable directory. # Verifies that the dependency for a basic sqlite service is actually available for use locally >>> assert SQLAlchemyStorage.is_available() >>> sql_storage = SQLAlchemyStorage(namespace=’testing_functionality’) >>> print(sql_storage) # OUTPUT: SQLAlchemyStorage(…) # Adding records to the storage >>> sql_storage.update(‘record_page_1’, {‘id’:52, ‘article’: ‘A name to remember’}) >>> sql_storage.update(‘record_page_2’, {‘id’:55, ‘article’: ‘A name can have many meanings’}) # Revising and overwriting a record >>> sql_storage.update(‘record_page_2’, {‘id’:53, ‘article’: ‘A name has many meanings’}) >>> sql_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace >>> sql_storage.retrieve_all() # OUTPUT: {‘testing_functionality:record_page_1’: {‘id’: 52, # ‘article’: ‘A name to remember’}, # ‘testing_functionality:record_page_2’: {‘id’: 53, # ‘article’: ‘A name has many meanings’}} # OUTPUT: [‘testing_functionality:record_page_1’, ‘testing_functionality:record_page_2’] >>> sql_storage.retrieve(‘record_page_1’) # retrieves the record for page 1 # OUTPUT: {‘id’: 52, ‘article’: ‘A name to remember’} >>> sql_storage.delete_all() # deletes all records from the namespace >>> sql_storage.retrieve_keys() # Will now be empty
- DEFAULT_CONFIG: Dict[str, Any] = {'echo': False, 'url': <function SQLAlchemyStorage.<lambda>>}
- DEFAULT_NAMESPACE: str | None = None
- DEFAULT_RAISE_ON_ERROR: bool = False
- __init__(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, **sqlalchemy_config) None[source]
Initialize the SQLAlchemy storage backend and connect to the server indicated via the url parameter.
This class uses the innate flexibility of SQLAlchemy to support backends such as SQLite, Postgres, DuckDB, etc.
- Parameters:
url (Optional[str]) – Database connection string. This can be provided positionally or as a keyword argument.
namespace (Optional[str]) – The prefix associated with each cache key. By default, this is None.
ttl (None) – Ignored. Included for interface compatibility; not implemented.
raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with SQLAlchemy. If None, the raise_on_error attribute defaults to SQLAlchemyStorage.DEFAULT_RAISE_ON_ERROR.
**sqlalchemy_config –
Additional SQLAlchemy engine/session options passed to sqlalchemy.create_engine Typical parameters include the following:
url (str): Indicates what server to connect to. Defaults to sqlite in the package directory.
echo (bool): Indicates whether to show the executed SQL queries in the console.
- clone() SQLAlchemyStorage[source]
Helper method for creating a new SQLAlchemyStorage with the same parameters.
Note that the implementation of the SQLAlchemyStorage is not able to be deep copied, and this method is provided for convenience in re-instantiation with the same configuration.
- config: dict
- delete(key: str) None[source]
Delete the value associated with the provided key from cache.
- Parameters:
key (str) – The key used associated with the stored data from cache.
- classmethod is_available(url: str | None = None, verbose: bool = True) bool[source]
Helper class method for testing whether the SQL service can be accessed. If so, this function returns True, otherwise False.
- Parameters:
host (str) – Indicates the location to attempt a connection
port (int) – Indicates the port where the service can be accessed
verbose (bool) – Indicates whether to log at the levels, DEBUG and lower, or to log warnings only
- namespace: str | None
- raise_on_error: bool
- retrieve(key: str) Any | None[source]
Retrieve the value associated with the provided key from cache.
- Parameters:
key (str) – The key used to fetch the stored data from cache.
- Returns:
The value returned is deserialized JSON object if successful. Returns None if the key does not exist.
- Return type:
Any
- retrieve_all() Dict[str, Any][source]
Retrieve all records from cache.
- Returns:
Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.
- Return type:
dict
- retrieve_keys() List[str][source]
Retrieve all keys for records from cache .
- Returns:
A list of all keys saved via SQL.
- Return type:
list
- update(key: str, data: Any) None[source]
Update the cache by storing associated value with provided key.
- Parameters:
key (str) – The key used to store the serialized JSON string in cache.
data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types like strings, numbers, lists, dictionaries, etc.
Module contents
The scholar_flux.data_storage module contains the core storage definitions used to cache the response content, records and metadata for each unique page/batch of records under a key used for cache identification.
- Core components:
- DataCacheManager: Contains the higher level methods used to create and interact with the processing cache storage
methods in a predictable manner.
- SQLAlchemyStorage: Contains the core methods needed to interact with a range of SQL Databases (and duckdb) using
the same underlying interface. By default, this class uses sqlalchemy to set up a db in a consistent location.
RedisStorage: Contains the core methods to the Redis Client. This storage defaults to localhost, port 6379
- MongoStorage: Contains the core methods used to interact with the Mongo DB database. By default, this class
attempts to Mongo DB on localhost on port 27017.
- InMemoryStorage: The default storage method - simply saves processed request content and responses to a
temporary dictionary that is deleted when the python session is stopped
- NullStorage: A No-Op storage method that is used to effectively turn off the use of storage.
This module is included for compatibility with the static typing used throughout the package
In addition, Exceptions for missing dependencies are set to return storage-specific errors if a storage is initialized without the necessary dependencies:
SQLAlchemyStorage -> sqlalchemy MongoStorage -> pymongo RedisStorage -> redis SQLAlchemyStorage -> sqlalchemy
- Example use:
>>> from scholar_flux import DataCacheManager, SearchCoordinator >>> processing_cache = DataCacheManager.with_storage('redis') >>> SearchCoordinator(query = 'Programming', cache_manager = processing_cache)
- class scholar_flux.data_storage.ABCStorage(*args, **kwargs)[source]
Bases:
ABCThe ABCStorage class provides the basic structure required to implement the data storage cache with customized backend.
This subclass provides methods to check the cache, delete from the cache, update the cache with new data, and retrieve data from the cache storage.
- abstract clone() Self[source]
Helper method for cloning the structure and configuration of future implementations.
- abstract delete_all(*args, **kwargs) None[source]
Core method for deleting all pages of records from the cache.
- abstract classmethod is_available(*args, **kwargs) bool[source]
Core method for verifying whether a storage/service is available.
- abstract retrieve(*args, **kwargs) Any | None[source]
Core method for retrieving a page of records from the cache.
- abstract retrieve_all(*args, **kwargs) Dict[str, Any] | None[source]
Core method for retrieving all pages of records from the cache.
- abstract retrieve_keys(*args, **kwargs) List[str] | None[source]
Core method for retrieving all keys from the cache.
- structure(flatten: bool = False, show_value_attributes: bool = True) str[source]
Helper method for quickly showing a representation of the overall structure of the current storage subclass. The instance uses the generate_repr helper function to produce human-readable representations of the core structure of the storage subclass with its defaults.
- Returns:
The structure of the current storage subclass as a string.
- Return type:
str
- abstract verify_cache(*args, **kwargs) bool[source]
Core method for verifying the cache based on the key.
- with_raise_on_error(value: bool = True)[source]
Uses a context manager to temporarily modify the raise_on_error attribute for the context duration.
All storage backends that inherit from the ABCStorage will also inherit the with_raise_on_error context manager. When used, this context manager temporarily sets the raise_on_error attribute to True or False for the duration of a code block without permanently changing the storage subclass’s configuration.
This context manager is most useful for briefly suppressing errors and in cache verification when errors need to be logged and reported instead of silently indicating that a cache entry couldn’t be found.
- Parameters:
value (bool) – A value to temporarily assign to raise_on_error for the context duration
Example
>>> with storage.with_raise_on_error(True): >>> # Any storage operation here will raise on error, regardless of the instance default >>> storage.retrieve(key)
- class scholar_flux.data_storage.DataCacheManager(cache_storage: ABCStorage | None = None)[source]
Bases:
objectDataCacheManager class manages caching of API responses.
This class provides methods to generate cache keys, verify cache entries, check cache validity, update cache with new data, and retrieve data from the cache storage.
- Parameters:
cache_storage (-) – Optional; A dictionary to store cached data. Defaults to using In-Memory Storage .
- - generate_fallback_cache_key(response)
Generates a unique fallback cache key based on the response URL and status code.
- - verify_cache(cache_key)
Checks if the provided cache_key exists in the cache storage.
- - cache_is_valid(cache_key, response=None, cached_response=None)
Determines whether the cached data for a given key is still valid.
- - update_cache(cache_key, response, store_raw=False, metadata=None, parsed_response=None, processed_records=None)
Updates the cache storage with new data.
- - retrieve(cache_key)
Retrieves data from the cache storage based on the cache key.
- - retrieve_from_response(response)
Retrieves data from the cache storage based on the response if within cache.
Examples
>>> from scholar_flux.data_storage import DataCacheManager >>> from scholar_flux.api import SearchCoordinator # Factory method that creates a default redis connection to the service on localhost if available. >>> redis_cache_manager = DataCacheManager.with_storage('redis') # Creates a search coordinator for retrieving API responses from the PLOS API provider >>> search_coordinator = SearchCoordinator(query = 'Computational Caching Strategies', provider_name='plos', cache_requests = True, # caches raw requests prior to processing cache_manager=redis_cache_manager) # caches response processing # Uses the cache manager to temporarily store cached responses for the default duration >>> processed_response = search_coordinator.search(page = 1) # On the next search, the processed response data can be retrieved directly for later response reconstruction >>> retrieved_response_json = search_coordinator.responses.cache.retrieve(processed_response.cache_key) # Serialized responses store the core response fields (content, URL, status code) associated with API responses >>> assert isinstance(retrieved_response_json, dict) and 'serialized_response' in retrieved_response_json
- __init__(cache_storage: ABCStorage | None = None) None[source]
Initializes the DataCacheManager with the selected cache storage.
- classmethod cache_fingerprint(obj: str | Any | None = None, package_version: str | None = '0.1.5') str[source]
This method helps identify changes in class/configuration for later cache retrieval. It generates a unique string based on the object and the package version.
By default, a fingerprint is generated from the current package version and object representation, if provided. If otherwise not provided, a new human-readable object representation is generated using the scholar_flux.utils.generate_repr helper function that represents the object name and its current state. A package version is also prepended to the current finger-print if enabled (not None), and can be customized if needed for object-specific versioning.
- Parameters:
obj (Optional[str]) – A finger-printed object, or an object to generate a representation of
package_version (Optional[str]) – The current package version string or manually provided version for a component).
- Returns:
A human-readable string including the version, object identity
- Return type:
str
- cache_is_valid(cache_key: str, response: Response | ResponseProtocol | None = None, cached_response: Dict[str, Any] | None = None) bool[source]
Determines whether the cached data for a given key is still valid or needs reprocessing due to missing fields or modified content when checked against the current response.
If a cached_response dictionary was not directly passed, the cache key will be retrieved from storage before comparison.
- Parameters:
cache_key (str) – The unique identifier for cached data.
response (Optional[Response | ResponseProtocol]) – The API response or response-like object used to validate the cache, if available.
cached_response – Optional[Dict[str, Any]]: The cached data associated with the key
- Returns:
True if the cache is valid, False otherwise.
- Return type:
bool
- clone() DataCacheManager[source]
Helper method for creating a newly cloned instance of the current DataCacheManager.
- delete(cache_key: str) None[source]
Deletes data from the cache storage based on the cache key.
- Parameters:
cache_key – A unique identifier for the cached data.
- Returns:
The cached data corresponding to the cache key if found, otherwise None.
- Return type:
None
- classmethod generate_fallback_cache_key(response: Response | ResponseProtocol) str[source]
Generates a unique fallback cache key based on the response URL and status code.
- Parameters:
response – The API response object.
- Returns:
A unique fallback cache key.
- Return type:
str
- classmethod generate_response_hash(response: Response | ResponseProtocol) str[source]
Generates a hash of the response content.
- Parameters:
response – The API response object.
- Returns:
A SHA-256 hash of the response content.
- Return type:
str
- isnull() bool[source]
Helper method for determining whether the current cache manager uses a null storage.
- classmethod null() DataCacheManager[source]
Creates a DataCacheManager using a NullStorage (no storage.
This storage device has the effect of returning False when validating whether the current DataCacheManager is in operation or not
- Returns:
The current class initialized without storage
- Return type:
- retrieve(cache_key: str) Dict[str, Any] | None[source]
Retrieves data from the cache storage based on the cache key.
- Parameters:
cache_key – A unique identifier for the cached data.
- Returns:
The cached data corresponding to the cache key if found, otherwise None.
- Return type:
Optional[Dict[str, Any]]
- retrieve_from_response(response: Response | ResponseProtocol) Dict[str, Any] | None[source]
Retrieves data from the cache storage based on the response if within cache.
- Parameters:
response – The API response object.
- Returns:
The cached data corresponding to the response if found, otherwise None.
- Return type:
Optional[Dict[str, Any]]
- structure(flatten: bool = False, show_value_attributes: bool = False) str[source]
Helper method for quickly showing a representation of the overall structure of the current DataCacheManager. The instance uses the generate_repr helper function to produce human-readable representations of the core structure of the storage subclass with its defaults.
- Returns:
The structure of the current DataCacheManager as a string.
- Return type:
str
- update_cache(cache_key: str, response: Response | ResponseProtocol, store_raw: bool = False, parsed_response: Any | None = None, metadata: Dict[str, Any] | None = None, extracted_records: Any | None = None, processed_records: Any | None = None, **kwargs) None[source]
Updates the cache storage with new data.
- Parameters:
cache_key – A unique identifier for the cached data.
response – (requests.Response | ResponseProtocol) The API response or response-like object.
store_raw – (Optional) A boolean indicating whether to store the raw response. Defaults to False.
metadata – (Optional) Additional metadata associated with the cached data. Defaults to None.
parsed_response – (Optional) The response data parsed into a structured format. Defaults to None.
processed_records – (Optional) The response data processed for specific use. Defaults to None.
kwargs – Optional additional hashable dictionary fields that can be stored using sql cattrs encodings or in-memory cache.
- verify_cache(cache_key: str | None) bool[source]
Checks if the provided cache_key exists in the cache storage.
- Parameters:
cache_key – A unique identifier for the cached data.
- Returns:
True if the cache key exists, False otherwise.
- Return type:
bool
- classmethod with_storage(cache_storage: Literal['redis', 'sql', 'sqlalchemy', 'mongodb', 'pymongo', 'inmemory', 'memory', 'null'] | None = None, *args, **kwargs) DataCacheManager[source]
Creates a DataCacheManager using a known storage device.
This is a convenience function allowing the user to create a DataCacheManager with redis, sql, mongodb, or inmemory storage with default settings or through the use of optional positional and keyword parameters to initialize the storage as needed. :returns: The current class initialized the chosen storage :rtype: DataCacheManager
- class scholar_flux.data_storage.InMemoryStorage(namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **kwargs)[source]
Bases:
ABCStorageDefault storage class that implements an in-memory storage cache using a dictionary.
This class implements the required abstract methods from the ABCStorage base class to ensure compatibility with the scholar_flux.DataCacheManager. Methods are provided to delete from the cache, update the cache with new data, and retrieve data from the cache.
- Parameters:
namespace (Optional[str]) – Prefix for cache keys. Defaults to None.
ttl (Optional[int]) – Ignored. Included for interface compatibility; not implemented.
**kwargs (Dict) – Ignored. Included for interface compatibility; not implemented.
Examples
>>> from scholar_flux.data_storage import InMemoryStorage ### defaults to a basic dictionary: >>> memory_storage = InMemoryStorage(namespace='testing_functionality') >>> print(memory_storage) # OUTPUT: InMemoryStorage(...) ### Adding records to the storage >>> memory_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> memory_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) ### Revising and overwriting a record >>> memory_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> memory_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> memory_storage.retrieve_all() # Will also be empty # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} >>> memory_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> memory_storage.delete_all() # deletes all records from the namespace >>> memory_storage.retrieve_keys() # Will now be empty >>> memory_storage.retrieve_all() # Will also be empty
- DEFAULT_NAMESPACE: str | None = None
- DEFAULT_RAISE_ON_ERROR: bool = False
- __init__(namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **kwargs) None[source]
Initialize a basic, dictionary-like memory_cache using a namespace.
Note that ttl and **kwargs are provided for interface compatibility, and specifying any of these as arguments will not affect processing or cache initialization.
- clone() InMemoryStorage[source]
Helper method for creating a new InMemoryStorage with the same configuration.
- delete(key: str) None[source]
Attempts to delete the selected cache key if found within the current namespace.
- Parameters:
key (str) – The key used associated with the stored data from the dictionary cache.
- classmethod is_available(*args, **kwargs) bool[source]
Helper method that returns True, indicating that dictionary-based storage will always be available.
- Returns:
True to indicate that the dictionary-base cache storage will always be available
- Return type:
(bool)
- namespace: str | None
- raise_on_error: bool
- retrieve(key: str) Any | None[source]
Attempts to retrieve a response containing the specified cache key within the current namespace.
- Parameters:
key (str) – The key used to fetch the stored data from cache.
- Returns:
The value returned is deserialized JSON object if successful. Returns None if the key does not exist.
- Return type:
Any
- retrieve_all() Dict[str, Any] | None[source]
Retrieves all cache key-response mappings found within the current namespace.
- Returns:
A dictionary containing each key-value mapping for all cached data within the same namespace
- retrieve_keys() List[str] | None[source]
Retrieves the full list of all cache keys found within the current namespace.
- Returns:
The full list of all keys that are currently mapped within the storage
- Return type:
List[str]
- structure(flatten: bool = False, show_value_attributes: bool = True) str[source]
Helper method for creating an in-memory cache without overloading the representation with the specifics of what is being cached.
- exception scholar_flux.data_storage.MongoDBImportError[source]
Bases:
OptionalDependencyImportErrorException for Mongo Dependency Issues.
- class scholar_flux.data_storage.MongoDBStorage(host: str | None = None, namespace: str | None = None, ttl: int | float | None = None, raise_on_error: bool | None = None, **mongo_config)[source]
Bases:
ABCStorageImplements the storage methods necessary to interact with MongoDB with a unified backend interface.
The MongoDBStorage uses the same underlying interface as other scholar_flux storage classes for use with the DataCacheManager. This implementation is designed to use a key-value store as a cache by which data can be stored and retrieved in a relatively straightforward manner similar to the In-Memory Storage.
Examples
>>> from scholar_flux.data_storage import MongoDBStorage # Defaults to connecting to locally (mongodb://127.0.0.1) on the default port for MongoDB (27017) # Verifies that a mongodb service is actually available locally on the default port >>> assert MongoDBStorage.is_available() >>> mongo_storage = MongoDBStorage(namespace='testing_functionality') >>> print(mongo_storage) # OUTPUT: MongoDBStorage(...) # Adding records to the storage >>> mongo_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> mongo_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) # Revising and overwriting a record >>> mongo_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> mongo_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> mongo_storage.retrieve_all() # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} >>> mongo_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> mongo_storage.delete_all() # deletes all records from the namespace >>> mongo_storage.retrieve_keys() # Will now be empty >>> mongo_storage.retrieve_all() # Will also be empty
- DEFAULT_CONFIG: Dict[str, Any] = {'collection': 'result_page', 'db': 'storage_manager_db', 'host': 'mongodb://127.0.0.1', 'port': 27017}
- DEFAULT_NAMESPACE: str | None = None
- DEFAULT_RAISE_ON_ERROR: bool = False
- __init__(host: str | None = None, namespace: str | None = None, ttl: int | float | None = None, raise_on_error: bool | None = None, **mongo_config)[source]
Initialize the Mongo DB storage backend and connect to the Mongo DB server.
If no parameters are specified, the MongoDB storage will default to the parameters derived from the scholar_flux.utils.config_settings.config dictionary, which, in turn, resolves the host and port from environment variables or the default MongoDB host/port in the following order of priority:
SCHOLAR_FLUX_MONGODB_HOST > MONGODB_HOST > ‘mongodb://127.0.0.1’ (localhost)
SCHOLAR_FLUX_MONGODB_PORT > MONGODB_PORT > 27017
- Parameters:
host (Optional[str]) –
The host address where the Mongo Database can be found. The default is ‘mongodb://127.0.0.1’, which is the mongo server on the localhost.
Each of the following are valid values for host:
Simple hostname: ‘localhost’ (uses port parameter)
Full URI: ‘mongodb://localhost:27017’ (ignores port parameter)
Complex URI: ‘mongodb://user:pass@host:27017/db?options’
namespace (Optional[str]) – The prefix associated with each cache key. By default, this is None.
ttl (Optional[float | int]) – The total number of seconds that must elapse for a cache record
raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with MongoDB. If None, the raise_on_error attribute defaults to MongoDBStorage.DEFAULT_RAISE_ON_ERROR.
**mongo_config (Dict[Any, Any]) – Configuration parameters required to connect to the Mongo DB server. Typically includes parameters such as host, port, db, etc.
- Raises:
MongoDBImportError – If db module is not available or fails to load.
- client: None
- clone() MongoDBStorage[source]
Helper method for creating a new MongoDBStorage with the same parameters.
Note that the implementation of the MongoClient is not able to be deep copied. This method is provided for convenience for re-instantiation with the same configuration.
- delete(key: str)[source]
Delete the value associated with the provided key from cache.
- Parameters:
key (str) – The key used associated with the stored data from the cache.
- Raises:
PyMongoError – If there is an error deleting the record
- delete_all()[source]
Delete all records from cache that match the current namespace prefix.
- Raises:
PyMongoError – If there an error occurred when deleting records from the collection
- classmethod is_available(host: str | None = None, port: int | None = None, verbose: bool = True) bool[source]
Helper method that indicates whether the MongoDB service is available or not.
It attempts to establish a connection on the provided host and port and returns a boolean indicating if the connection was successful.
Note that if the input to the host is a URI (e.g. mongodb://localhost:27017), any input provided to the port variable will be ignored when MongoClient initializes the connection and use the URI exclusively.
- Parameters:
host (Optional[str]) – The IP of the host of the MongoDB service. If None or an empty string, Defaults to localhost (the local computer) or the “host” entry from the class variable, DEFAULT_CONFIG.
port (Optional[int]) – The port where the service is hosted. If None or 0, defaults to port, 27017 or the “port” entry from the DEFAULT_CONFIG class variable.
verbose (bool) – Indicates whether to log status messages. Defaults to True
- Returns:
Indicating whether or not the service was be successfully accessed. The value returned is True if successful and False otherwise.
- Return type:
bool
- Raises:
ServerSelectionTimeoutError – If a timeout error occurs when attempting to ping Mongo DB
ConnectionFailure – If a connection cannot be established
- namespace: str | None
- raise_on_error: bool
- retrieve(key: str) Any | None[source]
Retrieve the value associated with the provided key from cache.
- Parameters:
key (str) – The key used to fetch the stored data from cache.
- Returns:
The value returned is deserialized JSON object if successful. Returns None if the key does not exist.
- Return type:
Any
- Raises:
PyMongoError – If there is an error retrieving the record
- retrieve_all() Dict[str, Any][source]
Retrieve all records from cache that match the current namespace prefix.
- Returns:
Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.
- Return type:
dict
- Raises:
PyMongoError – If there is an error during the retrieval of records under the namespace.
- retrieve_keys() List[str][source]
Retrieve all keys for records from cache.
- Returns:
A list of all keys saved via MongoDB.
- Return type:
list[str]
- Raises:
PyMongoError – If there is an error retrieving the record key.
- update(key: str, data: Any)[source]
Update the cache by storing associated value with provided key.
- Parameters:
key (str) – The key used to store the data in cache.
data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types such as strings, numbers, lists, dictionaries, etc.
- Raises:
PyMongoError – If an error occur when attempting to insert or update a record
- verify_cache(key: str) bool[source]
Check if specific cache key exists.
- Parameters:
key (str) – The key to check its presence in the Mongo DB storage backend.
- Returns:
True if the key is found otherwise False.
- Return type:
bool
- Raises:
ValueError – If provided key is empty or None.
CacheVerificationException – If an error occurs on data retrieval
- class scholar_flux.data_storage.NullStorage(namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = None, **kwargs)[source]
Bases:
ABCStorageNullStorage is a no-op implementation of ABCStorage. This class is useful for when you want to disable storage without changing code logic.
The scholar_flux package mainly implements this storage when the user turns off processing cache.
Example
>>> from scholar_flux.data_storage import DataCacheManager, NullStorage >>> from scholar_flux.api import SearchCoordinator >>> null_storage = DataCacheManager.null() ## This implements a data cache with the null storage under the hood: >>> assert isinstance(null_storage.cache_storage, NullStorage) >>> search_coordinator = SearchCoordinator(query='History of Data Caching', cache_manager=null_storage) # Otherwise the same can be performed with the following: >>> search_coordinator = SearchCoordinator(query='History of Data Caching', cache_results = False) # The processing of responses will then be recomputed on the next search: >>> response = search_coordinator.search(page = 1)
- DEFAULT_NAMESPACE: str | None = None
- DEFAULT_RAISE_ON_ERROR: bool = False
- __init__(namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = None, **kwargs) None[source]
Initialize a No-Op cache for compatibility with the ABCStorage base class.
Note that namespace, ttl, raise_on_error, and **kwargs are provided for interface compatibility, and specifying any of these as arguments will not affect initialization.
- clone() NullStorage[source]
Helper method for creating a new implementation of the current NullStorage.
- classmethod is_available(*args, **kwargs) bool[source]
Method added for abstract class consistency - returns, True indicating that the no-op storage is always available although no cache is ever stored.
- namespace: str | None
- raise_on_error: bool
- retrieve_all(*args, **kwargs) Dict[str, Any] | None[source]
Method added for abstract class consistency - returns a dictionary for type consistency
- exception scholar_flux.data_storage.OptionalDependencyImportError(message='Optional Dependency not found')[source]
Bases:
ExceptionBase exception for Optional Dependency Issues.
- exception scholar_flux.data_storage.RedisImportError[source]
Bases:
OptionalDependencyImportErrorException for missing redis backend.
- class scholar_flux.data_storage.RedisStorage(host: str | None = None, namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **redis_config)[source]
Bases:
ABCStorageImplements the storage methods necessary to interact with Redis using a unified backend interface.
The RedisStorage implements the abstract methods from the ABCStorage class for use with the DataCacheManager. This implementation is designed to use a key-value store as a cache by which data can be stored and retrieved in a relatively straightforward manner similar to the In-Memory Storage.
Examples
>>> from scholar_flux.data_storage import RedisStorage # Defaults to connecting to locally (localhost) on the default port for Redis services (6379) # Verifies that a Redis service is locally available. >>> assert RedisStorage.is_available() >>> redis_storage = RedisStorage(namespace='testing_functionality') >>> print(redis_storage) # OUTPUT: RedisStorage(...) # Adding records to the storage >>> redis_storage.update('record_page_1', {'id':52, 'article': 'A name to remember'}) >>> redis_storage.update('record_page_2', {'id':55, 'article': 'A name can have many meanings'}) # Revising and overwriting a record >>> redis_storage.update('record_page_2', {'id':53, 'article': 'A name has many meanings'}) >>> redis_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace # OUTPUT: ['testing_functionality:record_page_1', 'testing_functionality:record_page_2'] >>> redis_storage.retrieve_all() # Will also be empty # OUTPUT: {'testing_functionality:record_page_1': {'id': 52, # 'article': 'A name to remember'}, # 'testing_functionality:record_page_2': {'id': 53, # 'article': 'A name has many meanings'}} >>> redis_storage.retrieve('record_page_1') # retrieves the record for page 1 # OUTPUT: {'id': 52, 'article': 'A name to remember'} >>> redis_storage.delete_all() # deletes all records from the namespace >>> redis_storage.retrieve_keys() # Will now be empty >>> redis_storage.retrieve_all() # Will also be empty
- DEFAULT_CONFIG: dict = {'host': 'localhost', 'port': 6379}
- DEFAULT_NAMESPACE: str = 'SFAPI'
- DEFAULT_RAISE_ON_ERROR: bool = False
- __init__(host: str | None = None, namespace: str | None = None, ttl: int | None = None, raise_on_error: bool | None = None, **redis_config)[source]
Initialize the Redis storage backend and connect to the Redis server.
If no parameters are specified, the Redis storage will attempt to resolve the host and port using variables from the environment (loaded into scholar_flux.utils.config_settings at runtime).
The resolved host and port are resolved from environment variables/defaults in the following order of priority:
SCHOLAR_FLUX_REDIS_HOST > REDIS_HOST > ‘localhost’
SCHOLAR_FLUX_REDIS_PORT > REDIS_PORT > 6379
- Parameters:
host (Optional[str]) – Redis server host. Can be provided positionally or as a keyword argument. Defaults to ‘localhost’ if not specified.
namespace (Optional[str]) – The prefix associated with each cache key. Defaults to DEFAULT_NAMESPACE if left None.
ttl (Optional[int]) – The total number of seconds that must elapse for a cache record to expire. If not provided, ttl defaults to None.
raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with Redis. If None, the raise_on_error attribute defaults to RedisStorage.DEFAULT_RAISE_ON_ERROR.
**redis_config (Optional[Dict[Any, Any]]) – Configuration parameters required to connect to the Redis server. Typically includes parameters such as host, port, db, etc.
- Raises:
RedisImportError – If redis module is not available or fails to load.
- clone() RedisStorage[source]
Helper method for creating a new RedisStorage with the same parameters.
Note that the implementation of the RedisStorage is not able to be deep copied, and this method is provided for convenience in re-instantiation with the same configuration.
- config: dict
- delete(key: str) None[source]
Delete the value associated with the provided key from cache.
- Parameters:
key (str) – The key used associated with the stored data from cache.
- Raises:
RedisError – If there is an error deleting the record
- delete_all() None[source]
Delete all records from cache that match the current namespace prefix.
- Raises:
RedisError – If there an error occurred when deleting records from the collection
- classmethod is_available(host: str | None = None, port: int | None = None, verbose: bool = True) bool[source]
Helper class method for testing whether the Redis service is available and can be accessed.
If Redis can be successfully reached, this function returns True, otherwise False.
- Parameters:
host (Optional[str]) – Indicates the location to attempt a connection. If None or an empty string, Defaults to localhost (the local computer) or the “host” entry from the class variable, DEFAULT_CONFIG.
port (Optional[int]) – Indicates the port where the service can be accessed If None or 0, Defaults to port 6379 or the “port” entry from the DEFAULT_CONFIG class variable.
verbose (bool) – Indicates whether to log at the levels, DEBUG and lower, or to log warnings only
- Raises:
TimeoutError – If a timeout error occurs when attempting to ping Redis
ConnectionError – If a connection cannot be established
- namespace: str | None
- raise_on_error: bool
- retrieve(key: str) Any | None[source]
Retrieve the value associated with the provided key from cache.
- Parameters:
key (str) – The key used to fetch the stored data from cache.
- Returns:
The value returned is deserialized JSON object if successful. Returns None if the key does not exist.
- Return type:
Any
- retrieve_all() Dict[str, Any][source]
Retrieve all records from cache that match the current namespace prefix.
- Returns:
Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.
- Return type:
dict
- Raises:
RedisError – If there is an error during the retrieval of records under the namespace
- retrieve_keys() List[str][source]
Retrieve all keys for records from cache that match the current namespace prefix.
- Returns:
A list of all keys saved under the current namespace.
- Return type:
list
- Raises:
RedisError – If there is an error retrieving the record key
- update(key: str, data: Any) None[source]
Update the cache by storing associated value with provided key.
- Parameters:
key (str) – The key used to store the serialized JSON string in cache.
data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types like strings, numbers, lists, dictionaries, etc.
- Raises:
Redis – If an error occur when attempting to insert or update a record
- verify_cache(key: str) bool[source]
Check if specific cache key exists.
- Parameters:
key (str) – The key to check its presence in the Redis storage backend.
- Returns:
True if the key is found otherwise False.
- Return type:
bool
- Raises:
ValueError – If provided key is empty or None.
RedisError – If an error occurs when looking up a key
- exception scholar_flux.data_storage.SQLAlchemyImportError[source]
Bases:
OptionalDependencyImportErrorException for missing sql alchemy backend.
- class scholar_flux.data_storage.SQLAlchemyStorage(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, **sqlalchemy_config)[source]
Bases:
ABCStorageImplements the storage methods necessary to interact with SQLite3 in addition to other SQL flavors via sqlalchemy. This implementation is designed to use a relational database as a cache by which data can be stored and retrieved in a relatively straightforward manner that associates records in key-value pairs similar to the In-Memory Storage.
Note:
This table uses the structure previously defined in the CacheTable to store records in a structured manner:
- ID:
Automatically generated - identifies the unique record in the table
- Key:
Is used to associate a specific cached record with a short human-readable (or hashed) string
- Cache:
The JSON data associated with the record. To store the data, any nested, non-serializable data is first encoded before being unstructured and stored. On retrieving the data, the JSON string is decoded and restructured in order to return the original object.
The SQLAlchemyStorage can be initialized as follows:
### Import the package and initialize the storage in a dedicated package directory : >>> from scholar_flux.data_storage import SQLAlchemyStorage # Defaults to connecting to creating a local, file-based sqlite cache within the default writable directory. # Verifies that the dependency for a basic sqlite service is actually available for use locally >>> assert SQLAlchemyStorage.is_available() >>> sql_storage = SQLAlchemyStorage(namespace=’testing_functionality’) >>> print(sql_storage) # OUTPUT: SQLAlchemyStorage(…) # Adding records to the storage >>> sql_storage.update(‘record_page_1’, {‘id’:52, ‘article’: ‘A name to remember’}) >>> sql_storage.update(‘record_page_2’, {‘id’:55, ‘article’: ‘A name can have many meanings’}) # Revising and overwriting a record >>> sql_storage.update(‘record_page_2’, {‘id’:53, ‘article’: ‘A name has many meanings’}) >>> sql_storage.retrieve_keys() # retrieves all current keys stored in the cache under the namespace >>> sql_storage.retrieve_all() # OUTPUT: {‘testing_functionality:record_page_1’: {‘id’: 52, # ‘article’: ‘A name to remember’}, # ‘testing_functionality:record_page_2’: {‘id’: 53, # ‘article’: ‘A name has many meanings’}} # OUTPUT: [‘testing_functionality:record_page_1’, ‘testing_functionality:record_page_2’] >>> sql_storage.retrieve(‘record_page_1’) # retrieves the record for page 1 # OUTPUT: {‘id’: 52, ‘article’: ‘A name to remember’} >>> sql_storage.delete_all() # deletes all records from the namespace >>> sql_storage.retrieve_keys() # Will now be empty
- DEFAULT_CONFIG: Dict[str, Any] = {'echo': False, 'url': <function SQLAlchemyStorage.<lambda>>}
- DEFAULT_NAMESPACE: str | None = None
- DEFAULT_RAISE_ON_ERROR: bool = False
- __init__(url: str | None = None, namespace: str | None = None, ttl: None = None, raise_on_error: bool | None = False, **sqlalchemy_config) None[source]
Initialize the SQLAlchemy storage backend and connect to the server indicated via the url parameter.
This class uses the innate flexibility of SQLAlchemy to support backends such as SQLite, Postgres, DuckDB, etc.
- Parameters:
url (Optional[str]) – Database connection string. This can be provided positionally or as a keyword argument.
namespace (Optional[str]) – The prefix associated with each cache key. By default, this is None.
ttl (None) – Ignored. Included for interface compatibility; not implemented.
raise_on_error (Optional[bool]) – Determines whether an error should be raised when encountering unexpected issues when interacting with SQLAlchemy. If None, the raise_on_error attribute defaults to SQLAlchemyStorage.DEFAULT_RAISE_ON_ERROR.
**sqlalchemy_config –
Additional SQLAlchemy engine/session options passed to sqlalchemy.create_engine Typical parameters include the following:
url (str): Indicates what server to connect to. Defaults to sqlite in the package directory.
echo (bool): Indicates whether to show the executed SQL queries in the console.
- clone() SQLAlchemyStorage[source]
Helper method for creating a new SQLAlchemyStorage with the same parameters.
Note that the implementation of the SQLAlchemyStorage is not able to be deep copied, and this method is provided for convenience in re-instantiation with the same configuration.
- config: dict
- delete(key: str) None[source]
Delete the value associated with the provided key from cache.
- Parameters:
key (str) – The key used associated with the stored data from cache.
- classmethod is_available(url: str | None = None, verbose: bool = True) bool[source]
Helper class method for testing whether the SQL service can be accessed. If so, this function returns True, otherwise False.
- Parameters:
host (str) – Indicates the location to attempt a connection
port (int) – Indicates the port where the service can be accessed
verbose (bool) – Indicates whether to log at the levels, DEBUG and lower, or to log warnings only
- namespace: str | None
- raise_on_error: bool
- retrieve(key: str) Any | None[source]
Retrieve the value associated with the provided key from cache.
- Parameters:
key (str) – The key used to fetch the stored data from cache.
- Returns:
The value returned is deserialized JSON object if successful. Returns None if the key does not exist.
- Return type:
Any
- retrieve_all() Dict[str, Any][source]
Retrieve all records from cache.
- Returns:
Dictionary of key-value pairs. Keys are original keys, values are JSON deserialized objects.
- Return type:
dict
- retrieve_keys() List[str][source]
Retrieve all keys for records from cache .
- Returns:
A list of all keys saved via SQL.
- Return type:
list
- update(key: str, data: Any) None[source]
Update the cache by storing associated value with provided key.
- Parameters:
key (str) – The key used to store the serialized JSON string in cache.
data (Any) – A Python object that will be serialized into JSON format and stored. This includes standard data types like strings, numbers, lists, dictionaries, etc.