scholar_flux.api package

Subpackages

Submodules

scholar_flux.api.base_api module

Defines the BaseAPI, which implements minimal features such as caching, requests, and response retrieval.

The BaseAPI is subclassed by scholar_flux.api.SearchAPI to further build and formulate requests based on the parameters accepted by each API provider given their respective configurations.

class scholar_flux.api.base_api.BaseAPI(user_agent: str | None = None, session: Session | None = None, timeout: int | float | None = None, use_cache: bool | None = None)[source]

Bases: object

The BaseAPI client is a minimal implementation for user-friendly request preparation and response retrieval.

Parameters:
  • session (Optional[requests.Session]) – A pre-configured requests or requests-cache session. A new session is created if not specified.

  • user_agent (Optional[str]) – An optional user-agent string for the session.

  • timeout – (Optional[int | float]): Identifies the number of seconds to wait before raising a TimeoutError

  • use_cache (bool) – Indicates whether or not to create a cached session. If a cached session is already specified, this setting will have no effect on the creation of a session.

Examples

>>> from scholar_flux.api import BaseAPI
# creating a basic API client that uses the PLOS API as the default while caching response data in-memory:
>>> base_api = BaseAPI(use_cache=True)
# retrieve a basic request:
>>> parameters = {'q': 'machine learning', 'start': 1, 'rows': 20}
>>> response_page_1 = base_api.send_request('https://api.plos.org/search', parameters=parameters)
>>> assert response_page_1.ok
>>> response_page_1
# OUTPUT: <Response [200]>
>>> ml_page_1 = response_page_1.json()
# retrieving the next page:
>>> parameters['start'] = 21
>>> response_page_2 = base_api.send_request('https://api.plos.org/search', parameters=parameters)
>>> assert response_page_2.ok
>>> response_page_2
# OUTPUT: <Response [200]>
>>> ml_page_2 = response_page_2.json()
>>> ml_page_2
# OUTPUT: {'response': {'numFound': '...', 'start': 21, 'docs': ['...']}} # redacted
DEFAULT_TIMEOUT: int = 20
DEFAULT_USE_CACHE: bool = False
__init__(user_agent: str | None = None, session: Session | None = None, timeout: int | float | None = None, use_cache: bool | None = None)[source]

Initializes the BaseAPI client for response retrieval given the provided inputs.

The necessary attributes are prepared with a new or existing session (cached or uncached) via dependency injection. This class is designed to be subclassed for specific API implementations.

Parameters:
  • user_agent (Optional[str]) – Optional user-agent string for the session.

  • session (Optional[requests.Session]) – A pre-configured session or None to create a new session.

  • timeout (Optional[int | float]) – Timeout for requests in seconds.

  • use_cache (Optional[bool]) – Indicates whether or not to use cache. The default setting is to create a regular requests.Session unless a CachedSession is already provided.

configure_session(session: Session | None = None, user_agent: str | None = None, use_cache: bool | None = None) Session[source]

Creates a session object if one does not already exist. If use_cache = True, then a cached session object will be used. A regular session that is not already cached will be overridden.

Parameters:
  • session (Optional[requests.Session]) – A pre-configured session or None to create a new session.

  • user_agent (Optional[str]) – Optional user-agent string for the session.

  • use_cache (Optional[bool]) – Indicates whether or not to use cache if a cached session doesn’t yet exist. If use_cache is True and a cached session has already been passed, the previously created cached session is returned. Otherwise, a new CachedSession is created.

Returns:

The configured session.

Return type:

requests.Session

prepare_request(base_url: str, endpoint: str | None = None, parameters: Dict[str, Any] | None = None) PreparedRequest[source]

Prepares a GET request for the specified endpoint with optional parameters.

Parameters:
  • base_url (str) – The base URL for the API.

  • endpoint (Optional[str]) – The API endpoint to prepare the request for.

  • parameters (Optional[Dict[str, Any]]) – Optional query parameters for the request.

Returns:

The prepared request object.

Return type:

prepared_request (PreparedRequest)

send_request(base_url: str, endpoint: str | None = None, parameters: Dict[str, Any] | None = None, timeout: int | float | None = None) Response[source]

Sends a GET request to the specified endpoint with optional parameters.

Parameters:
  • base_url (str) – The base API to send the request to.

  • endpoint (Optional[str]) – The endpoint of the API to send the request to.

  • parameters (Optional[Dict[str, Any]]) – Optional query parameters for the request.

  • timeout (int) – Timeout for the request in seconds.

Returns:

The response object.

Return type:

requests.Response

session: Session
structure(flatten: bool = True, show_value_attributes: bool = False) str[source]

Base method for showing the structure of the current BaseAPI. This method reveals the configuration settings of the API client that will be used to send requests.

Returns:

The current structure of the BaseAPI or its subclass.

Return type:

str

summary() str[source]

Create a summary representation of the current structure of the API:

Returns the original representation.

property user_agent: str | None

The User-Agent should always reflect what is used in the session.

This method retrieves the User-Agent from the session directly.

scholar_flux.api.base_coordinator module

Defines the BaseCoordinator that implements the most basic orchestration components used to request, process, and optional cache processed record data from APIs.

class scholar_flux.api.base_coordinator.BaseCoordinator(search_api: SearchAPI, response_coordinator: ResponseCoordinator)[source]

Bases: object

BaseCoordinator providing the minimum functionality for requesting and retrieving records and metadata from APIs.

This class uses dependency injection to orchestrate the process of constructing requests, validating responses, and processing scientific works and articles. This class is designed to provide the absolute minimum necessary functionality to both retrieve and process data from APIs and can make use of caching functionality for caching requests and responses.

After initialization, the BaseCoordinator uses two main components for the sequential orchestration of response retrieval, processing, and caching.

Components:
SearchAPI (api/search_api):

Handles the creation and orchestration of search requests in addition to the caching of successful requests via dependency injection.

ResponseCoordinator (responses/response_coordinator): Handles the full range of response

processing steps after retrieving a response from an API. These parsing, extraction, and processing steps occur sequentially when a new response is received. If a response was previously handled, the coordinator will attempt to retrieve these responses from the processing cache.

Example

>>> from scholar_flux.api import SearchAPI, ResponseCoordinator, BaseCoordinator
# Note: the SearchAPI uses PLOS by default if `provider_name` is not provided.
# Unless the `SCHOLAR_FLUX_DEFAULT_PROVIDER` env variable is set to another provider.
>>> base_search_coordinator = BaseCoordinator(search_api = SearchAPI(query = 'Math'),
>>>                                           response_coordinator = ResponseCoordinator.build())
>>> response = base_search_coordinator.search(page = 1)
>>> response
# OUTPUT <ProcessedResponse(len=20, cache_key=None, metadata="{'numFound': 14618, 'start': 1, ...})>
# All processed records for a particular response can be found under response.data (a list of dictionaries)
>>> list(response.data[0].keys())
# OUTPUT ['article_type', 'eissn', 'id', 'journal', 'publication_date', 'score', 'title_display',
#         'abstract', 'author_display']
__init__(search_api: SearchAPI, response_coordinator: ResponseCoordinator)[source]

Initializes the base coordinator by delegating assignment of attributes to the _initialize method. Future coordinators can follow a similar pattern of using an _initialize for initial parameter assignment.

Parameters:
  • search_api (Optional[SearchAPI]) – The search API to use for the retrieval of response records from APIs

  • response_coordinator (Optional[ResponseCoordinator]) – Core class used to handle the processing and core handling of all responses from APIs

property api: SearchAPI

Alias for the underlying API used for searching.

classmethod as_coordinator(search_api: SearchAPI, response_coordinator: ResponseCoordinator, *args, **kwargs) Self[source]

Helper factory method for building a SearchCoordinator that allows users to build from the final building blocks of a SearchCoordinator.

Parameters:
  • search_API (Optional[SearchAPI]) – The search API to use for the retrieval of response records from APIs

  • response_coordinator (Optional[ResponseCoordinator]) – Core class used to handle the processing and core handling of all responses from APIs

Returns:

A newly created coordinator subclassed from a BaseCoordinator that also orchestrates record retrieval and processing

Return type:

BaseCoordinator

property extractor: BaseDataExtractor

Allows direct access to the DataExtractor from the ResponseCoordinator.

property parser: BaseDataParser

Allows direct access to the data parser from the ResponseCoordinator.

property processor: ABCDataProcessor

Allows direct access to the DataProcessor from the ResponseCoordinator.

property response_coordinator: ResponseCoordinator

Allows the ResponseCoordinator to be used as a property.

The response_coordinator handles and coordinates the processing of API responses from parsing, record/metadata extraction, processing, and cache management.

property responses: ResponseCoordinator

An alias for the response_coordinator property that is used for orchestrating the processing of retrieved API responses.

Handles response orchestration, including response content parsing, the extraction of records/metadata, record processing, and cache operations.

search(**kwargs) ProcessedResponse | ErrorResponse | None[source]

Public Search Method coordinating the retrieval and processing of an API response.

This method serves as the base and will primarily handle the “How” of searching (e.g. Workflows, Single page search, etc.)

property search_api: SearchAPI

Allows the search_api to be used as a property while also allowing for verification.

structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method for quickly showing a representation of the overall structure of the SearchCoordinator. The helper function, generate_repr_from_string helps produce human-readable representations of the core structure of the Coordinator.

Parameters:
  • flatten (bool) – Whether to flatten the coordinator’s structural representation into a single line. Default=False

  • show_value_attributes (bool) – Whether to show nested attributes of the components of the BaseCoordinator its subclass.

Returns:

The structure of the current SearchCoordinator as a string.

Return type:

str

summary() str[source]

Helper method for showing the structure of the current search coordinator.

scholar_flux.api.multisearch_coordinator module

Defines the MultiSearchCoordinator that builds on the features implemented by the SearchCoordinator to create multiple queries to different providers either sequentially or by using multithreading.

This implementation uses shared rate limiting to ensure that rate limits to different providers are not exceeded.

class scholar_flux.api.multisearch_coordinator.MultiSearchCoordinator(*args, **kwargs)[source]

Bases: UserDict

The MultiSearchCoordinator is a utility class for orchestrating searches across multiple providers, pages, and queries sequentially or using multithreading. This coordinator builds on the SearchCoordinator’s core structure to ensure consistent, rate-limited API requests.

The multi-search coordinator uses shared rate limiters to ensure that requests to the same provider (even across different queries) will use the same rate limiter.

This implementation uses the ThreadedRateLimiter.min_interval parameter from the shared rate limiter of each provider to determine the request_delay across all queries. These settings can be found and modified in the scholar_flux.api.providers.threaded_rate_limiter_registry by provider_name.

For new, unregistered providers, users can override the MultiSearchCoordinator.DEFAULT_THREADED_REQUEST_DELAY class variable to adjust the shared request_delay.

# Examples:

>>> from scholar_flux import MultiSearchCoordinator, SearchCoordinator, RecursiveDataProcessor
>>> from scholar_flux.api.rate_limiting import threaded_rate_limiter_registry
>>> multi_search_coordinator = MultiSearchCoordinator()
>>> threaded_rate_limiter_registry['arxiv'].min_interval = 6 # arbitrary rate limit (seconds per request)
>>>
>>> # Create coordinators for different queries and providers
>>> coordinators = [
...     SearchCoordinator(
...         provider_name=provider,
...         query=query,
...         processor=RecursiveDataProcessor(),
...         user_agent="SammieH",
...         cache_requests=True
...     )
...     for query in ('ml', 'nlp')
...     for provider in ('plos', 'arxiv', 'openalex', 'crossref')
... ]
>>>
>>> # Add coordinators to the multi-search coordinator
>>> multi_search_coordinator.add_coordinators(coordinators)
>>>
>>> # Execute searches across multiple pages
>>> all_pages = multi_search_coordinator.search_pages(pages=[1, 2, 3])
>>>
>>> # filters and retains successful requests from the multi-provider search
>>> filtered_pages = all_pages.filter()
>>> # The results will contain successfully processed responses across all queries, pages, and providers
>>> print(filtered_pages)  # Output will be a list of SearchResult objects
>>> # Extracts successfully processed records into a list of records where each record is a dictionary
>>> record_dict = filtered_pages.join() # retrieves a list of records
>>> print(record_dict)  # Output will be a flattened list of all records
DEFAULT_THREADED_REQUEST_DELAY: float | int = 6.0
__init__(*args, **kwargs)[source]

Initializes the MultiSearchCoordinator, allowing positional and keyword arguments to be specified when creating the MultiSearchCoordinator.

The initialization of the MultiSearchCoordinator operates similarly to that of a regular dict with the caveat that values are statically typed as SearchCoordinator instances.

add(search_coordinator: SearchCoordinator)[source]

Adds a new SearchCoordinator to the MultiSearchCoordinator instance.

Parameters:

search_coordinator (SearchCoordinator) – A search coordinator to add to the MultiSearchCoordinator dict

Raises: InvalidCoordinatorParameterException: If the expected type is not a SearchCoordinator

add_coordinators(search_coordinators: Iterable[SearchCoordinator])[source]

Helper method for adding a sequence of coordinators at a time.

property coordinators: list[SearchCoordinator]

Utility property for quickly retrieving a list of all currently registered coordinators.

current_providers() set[str][source]

Extracts a set of names corresponding to the each API provider assigned to the MultiSearchCoordinator.

group_by_provider() dict[str, dict[str, SearchCoordinator]][source]

Groups all coordinators by provider name to facilitate retrieval with normalized components where needed. Especially helpful in the latter retrieval of articles when using multithreading by provider (as opposed to by page) to account for strict rate limits. All coordinated searches corresponding to a provider would appear under a nested dictionary to facilitate orchestration on the same thread with the same rate limiter.

Returns:

All elements in the final dictionary map provider-specific coordinators to the normalized provider name for the nested dictionary of coordinators.

Return type:

dict[str, dict[str, SearchCoordinator]]

iter_pages(pages: Sequence[int] | PageListInput, iterate_by_group: bool = False, **kwargs) Generator[SearchResult, None, None][source]

Helper method that creates and joins a sequence of generator functions for retrieving and processing records from each combination of queries, pages, and providers in sequence. This implementation uses the SearchCoordinator.iter_pages to dynamically identify when page retrieval should halt for each API provider, accounting for errors, timeouts, and less than the expected amount of records before filtering records with pre- specified criteria.

Parameters:
  • pages (Sequence[int]) – A sequence of page numbers to iteratively request from the API Provider.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage.

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage.

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

Yields:

SearchResult

Iteratively returns the SearchResult for each provider, query, and page using a generator

expression. Each result contains the requested page number (page), the name of the provider (provider_name), and the result of the search containing a ProcessedResponse, an ErrorResponse, or None (api response)

iter_pages_threaded(pages: Sequence[int] | PageListInput, max_workers: int | None = None, **kwargs) Generator[SearchResult, None, None][source]

Threading by provider to respect rate limits Helper method that implements threading to simultaneously retrieve a sequence of generator functions for retrieving and processing records from each combination of queries, pages, and providers in a multi-threaded set of sequences grouped by provider.

This implementation also uses the SearchCoordinator.iter_pages to dynamically identify when page retrieval should halt for each API provider, accounting for errors, timeouts, and less than the expected amount of records before filtering records with pre-specified criteria.

Note, that as threading is performed by provider, this method will not differ significantly in speed from the MultiSearchCoordinator.iter_pages method if only a single provider has been specified.

Parameters:
  • pages (Sequence[int] | PageListInput) – A sequence of page numbers to request from the API Provider.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage.

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage.

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

Yields:

SearchResult

Iteratively returns the SearchResult for each provider, query, and page using a generator

expression as each SearchResult becomes available after multi-threaded processing. Each result contains the requested page number (page), the name of the provider (provider_name), and the result of the search containing a ProcessedResponse, an ErrorResponse, or None (api response)

search(page: int = 1, iterate_by_group: bool = False, max_workers: int | None = None, multithreading: bool = True, **kwargs) SearchResultList[source]

Public method used to search for a single or multiple pages from multiple providers at once using a sequential or multithreading approach. This approach delegates the search to search_pages to retrieve a single page for query and provider using an iterative approach to search for articles grouped by provider.

Note that the MultiSearchCoordinator.search_pages method uses shared rate limiters to ensure that APIs are not overwhelmed by the number of requests being sent within a specific time interval.

Parameters:
  • pages (Sequence[int]) – A sequence of page numbers to iteratively request from the API Provider.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage.

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage.

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

Returns:

The list containing all retrieved and processed pages from the API. If any non-stopping

errors occur, this will return an ErrorResponse instead with error and message attributes further explaining any issues that occurred during processing.

Return type:

SearchResultList

search_pages(pages: Sequence[int] | PageListInput, iterate_by_group: bool = False, max_workers: int | None = None, multithreading: bool = True, **kwargs) SearchResultList[source]

Public method used to search articles from multiple providers at once using a sequential or multithreading approach. This approach uses iter_pages under the.

Note that the MultiSearchCoordinator.search_pages method uses shared rate limiters to ensure that APIs are not overwhelmed by the number of requests being sent within a specific time interval.

Parameters:
  • pages (Sequence[int]) – A sequence of page numbers to iteratively request from the API Provider.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage.

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage.

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

Returns:

The list containing all retrieved and processed pages from the API. If any non-stopping

errors occur, this will return an ErrorResponse instead with error and message attributes further explaining any issues that occurred during processing.

Return type:

SearchResultList

structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method that shows the current structure of the MultiSearchCoordinator.

scholar_flux.api.response_coordinator module

The scholar_flux.api.response_coordinator module implements the ResponseCoordinator that is used to coordinate the processing of successfully and unsuccessfully retrieved responses. This class is used by the SearchCoordinator to orchestrate the response parsing, processing and caching of responses.

The ResponseCoordinator relies on dependency injection to modify the processing methods used at each step.

class scholar_flux.api.response_coordinator.ResponseCoordinator(parser: BaseDataParser, extractor: BaseDataExtractor, processor: ABCDataProcessor, cache_manager: DataCacheManager)[source]

Bases: object

Coordinates the parsing, extraction, processing, and caching of API responses. The ResponseCoordinator operates on the concept of dependency injection to orchestrate the entire process. Because the structure of the coordinator (parser, extractor, processor)

Note that the overall composition of the coordinator is a governing factor in how the response is processed. The ResponseCoordinator uses a cache key and schema fingerprint to ensure that it is only returning a processed response from the cache storage if the structure of the coordinator at the time of cache storage has not changed.

To ensure that we’re not pulling from cache on significant changes to the ResponseCoordinator, we validate the schema by default using DEFAULT_VALIDATE_FINGERPRINT. When the schema changes, previously cached data is ignored, although this can be explicitly overridden during response handling.

The coordinator orchestration process operates mainly through the ResponseCoordinator.handle_response method that sequentially calls the parser, extractor, processor, and cache_manager.

Example workflow:

>>> from scholar_flux.api import SearchAPI, ResponseCoordinator
>>> api = SearchAPI(query = 'technological innovation', provider_name = 'crossref', user_agent = 'scholar_flux')
>>> response_coordinator = ResponseCoordinator.build() # uses defaults with caching in-memory
>>> response = api.search(page = 1)
# future calls with the same structure will be cached
>>> processed_response = response_coordinator.handle_response(response, cache_key='tech-innovation-cache-key-page-1')
# the ProcessedResponse (or ErrorResponse) stores critical fields from the original and processed response
>>> processed_response
# OUTPUT: ProcessedResponse(len=20, cache_key='tech-innovation-cache-key-page-1', metadata=...)
>>> new_processed_response = response_coordinator.handle_response(processed_response, cache_key='tech-innovation-cache-key-page-1')
>>> new_processed_response
# OUTPUT: ProcessedResponse(len=20, cache_key='tech-innovation-cache-key-page-1', metadata=...)

Note that the entire process can be orchestrated via the SearchCoordinator that uses the SearchAPI and ResponseCoordinator as core dependency injected components:

>>> from scholar_flux import SearchCoordinator
>>> search_coordinator = SearchCoordinator(api, response_coordinator, cache_requests=True)
# uses a default cache key constructed from the response internally
>>> processed_response = search_coordinator.search(page = 1)
# OUTPUT: ProcessedResponse(len=20, cache_key='crossref_technological innovation_1_20', metadata=...)
>>> processed_response.content == new_processed_response.content
Parameters:
DEFAULT_VALIDATE_FINGERPRINT: bool = True
__init__(parser: BaseDataParser, extractor: BaseDataExtractor, processor: ABCDataProcessor, cache_manager: DataCacheManager)[source]

Initializes the response coordinator using the core components used to parse, process, and cache response data.

classmethod build(parser: BaseDataParser | None = None, extractor: BaseDataExtractor | None = None, processor: ABCDataProcessor | None = None, cache_manager: DataCacheManager | None = None, cache_results: bool | None = None) ResponseCoordinator[source]

Factory method to build a ResponseCoordinator with sensible defaults.

Parameters:
  • parser – Optional([BaseDataParser]): First step of the response processing pipeline - parses response records into a dictionary

  • extractor – (Optional[BaseDataExtractor]): Extracts both records and metadata from responses separately

  • processor – (Optional[ABCDataProcessor]): Processes API responses into list of dictionaries

  • cache_manager – (Optional[DataCacheManager]): Manages the caching of processed records for faster retrieval

  • cache_requests – (Optional[bool]): Determines whether or not to cache requests - api is the ground truth if not directly specified

  • cache_results – (Optional[bool]): Determines whether or not to cache processed responses - on by default unless specified or if a cache manager is already provided

Returns:

A fully constructed coordinator.

Return type:

ResponseCoordinator

property cache: DataCacheManager

Alias for the response data processing cache manager:

Also allows direct access to the DataCacheManager from the ResponseCoordinator

property cache_manager: DataCacheManager

Allows direct access to the DataCacheManager from the ResponseCoordinator.

classmethod configure_cache(cache_manager: DataCacheManager | None = None, cache_results: bool | None = None) DataCacheManager[source]

Helper method for building and swapping out cache managers depending on the cache chosen.

Parameters:
  • cache_manager (Optional[DataCacheManager]) – An optional cache manager to use

  • cache_results (Optional[bool]) – Ground truth parameter, used to resolve whether to use caching when the cache_manager and cache_results contradict

Returns:

An existing or newly created cache manager that can be used with the ResponseCoordinator

Return type:

DataCacheManager

property extractor: BaseDataExtractor

Allows direct access to the DataExtractor from the ResponseCoordinator.

handle_response(response: Response | ResponseProtocol, cache_key: str | None = None, from_cache: bool = True, validate_fingerprint: bool | None = None) ErrorResponse | ProcessedResponse[source]

Retrieves the data from the processed response from cache as a if previously cached. Otherwise the data is retrieved after processing the response. The response data is subsequently transformed into a dataclass containing the response content, processing info, and metadata.

Parameters:
  • response (Response) – Raw API response.

  • cache_key (Optional[str]) – Cache key for storing/retrieving.

  • from_cache – (bool): Should we try to retrieve the processed response from the cache?

Returns:

A Dataclass Object that contains response data

and detailed processing info.

Return type:

ProcessedResponse

handle_response_data(response: Response, cache_key: str | None = None) List[Dict[Any, Any]] | List | None[source]

Retrieves the data from the processed response from cache if previously cached. Otherwise the data is retrieved after processing the response.

Parameters:
  • response (Response) – Raw API response.

  • cache_key (Optional[str]) – Cache key for storing/retrieving.

Returns:

Processed response data or None.

Return type:

Optional[List[Dict[Any, Any]]]

property parser: BaseDataParser

Allows direct access to the data parser from the ResponseCoordinator.

property processor: ABCDataProcessor

Allows direct access to the DataProcessor from the ResponseCoordinator.

schema_fingerprint() str[source]

Helper method for generating a concise view of the current structure of the response coordinator.

structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method for retrieving a string representation of the overall structure of the current ResponseCoordinator. The helper function, generate_repr_from_string helps produce human-readable representations of the core structure of the ResponseCoordinator.

Parameters:
  • flatten (bool) – Whether to flatten the ResponseCoordinator’s structural representation into a single line.

  • show_value_attributes (bool) – Whether to show nested attributes of the components in the structure of the current ResponseCoordinator instance.

Returns:

The structure of the current ResponseCoordinator as a string.

Return type:

str

summary() str[source]

Helper class for creating a quick summary representation of the structure of the Response Coordinator.

classmethod update(response_coordinator: ResponseCoordinator, parser: BaseDataParser | None = None, extractor: BaseDataExtractor | None = None, processor: ABCDataProcessor | None = None, cache_manager: DataCacheManager | None = None, cache_results: bool | None = None) ResponseCoordinator[source]

Factory method to create a new ResponseCoordinator from an existing configuration.

Parameters:
  • response_coordinator – Optional([ResponseCoordinator]): ResponseCoordinator containing the defaults to swap

  • parser – Optional([BaseDataParser]): First step of the response processing pipeline - parses response records into a dictionary

  • extractor – (Optional[BaseDataExtractor]): Extracts both records and metadata from responses separately

  • processor – (Optional[ABCDataProcessor]): Processes API responses into list of dictionaries

  • cache_manager – (Optional[DataCacheManager]): Manages the caching of processed records for faster retrieval

  • cache_requests – (Optional[bool]): Determines whether or not to cache requests - api is the ground truth if not directly specified

  • cache_results – (Optional[bool]): Determines whether or not to cache processed responses - on by default unless specified or if a cache manager is already provided

Returns:

A fully constructed coordinator.

Return type:

ResponseCoordinator

scholar_flux.api.response_validator module

The scholar_flux.api.response_validator module implements a basic ResponseValidator that is used for preliminary response validation to determine whether received responses are valid and successful.

This class is used by default in SearchCoordinators to determine whether to proceed with response processing.

class scholar_flux.api.response_validator.ResponseValidator[source]

Bases: object

Helper class that serves as an initial response validation step to ensure that, in custom retry handling, the basic structure of a response can be validated to determine whether or not to retry the response retrieval process.

The ResponseValidator implements class methods that are simple tools that return boolean values (True/False) when response or response-like objects do not contain the required structure and raise errors when encountering non-response objects or when raise_on_error = True otherwise.

Example

>>> from scholar_flux.api import ResponseValidator, ReconstructedResponse
>>> mock_success_response = ReconstructedResponse.build(status_code = 200,
>>>                                                     json = {'response': 'success'},
>>>                                                     url = "https://an-example-url.com",
>>>                                                     headers={'Content-Type': 'application/json'}
>>>                                                     )
>>> ResponseValidator.validate_response(mock_success_response) is True
>>> ResponseValidator.validate_content(mock_success_response) is True
structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method that shows the current structure of the ResponseValidator class in a string format. This method will show the name of the current class along with its attributes (ResponseValidator())

Returns:

A string representation of the current structure of the ResponseValidator

Return type:

str

classmethod validate_content(response: Response | ResponseProtocol, expected_format: str = 'application/json', *, raise_on_error: bool = False) bool[source]

Validates the response content type.

Parameters:
  • response (requests.Response | ResponseProtocol) – The HTTP response or response-like object to check.

  • expected_format (str) – The expected content type substring (e.g., “application/json”).

  • raise_on_error (bool) – If True, raises InvalidResponseException on mismatch.

Returns:

True if the content type matches, False otherwise.

Return type:

bool

Raises:

InvalidResponseException – If the content type does not match and raise_on_error is True.

classmethod validate_response(response: Response | ResponseProtocol, *, raise_on_error: bool = False) bool[source]

Validates HTTP responses by verifying first whether the object is a Response or follows a ResponseProtocol. For valid response or response- like objects, the status code is verified, returning True for 400 and 500 level validation errors and raising an error if raise_on_error is set to True.

Note that a ResponseProtocol duck-types and verifies that each of a minimal set of attributes and/or properties can be found within the current response.

In the scholar_flux retrieval step, this validator verifies that the response received is a valid response.

Parameters:
  • response – (requests.Response | ResponseProtocol): The HTTP response object to validate

  • raise_on_error (bool) – If True, raises InvalidResponseException on error for invalid response status codes

Returns:

True if valid, False otherwise

Raises:

scholar_flux.api.search_api module

Implements the SearchAPI that is the core interface used throughout the scholar_flux package to retrieve responses.

The SearchAPI builds on the BaseAPI to simplify parameter handling into a universal interface where the specifics of parameter names and request formation are abstracted.

class scholar_flux.api.search_api.SearchAPI(query: str, provider_name: str | None = None, parameter_config: BaseAPIParameterMap | APIParameterMap | APIParameterConfig | None = None, session: Session | CachedSession | None = None, user_agent: str | None = None, timeout: int | float | None = None, masker: SensitiveDataMasker | None = None, use_cache: bool | None = None, base_url: str | None = None, api_key: SecretStr | str | None = None, records_per_page: int = 20, request_delay: float | None = None, **api_specific_parameters)[source]

Bases: BaseAPI

The core interface that handles the retrieval of JSON, XML, and YAML content from the scholarly API sources offered by several providers such as SpringerNature, PLOS, and PubMed. The SearchAPI is structured to allow flexibility without complexity in initialization. API clients can be either constructed piece-by-piece or with sensible defaults for session-based retrieval, API key management, caching, and configuration options.

This class is integrated into the SearchCoordinator as a core component of a pipeline that further parses the response, extracts records and metadata, and caches the processed records to facilitate downstream tasks such as research, summarization, and data mining.

Examples

>>> from scholar_flux.api import SearchAPI
# creating a basic API that uses the PLOS as the default while caching data in-memory:
>>> api = SearchAPI(query = 'machine learning', provider_name = 'plos', use_cache = True)
# retrieve a basic request:
>>> response_page_1 = api.search(page = 1)
>>> assert response_page_1.ok
>>> response_page_1
# OUTPUT: <Response [200]>
>>> ml_page_1 = response_page_1.json()
# future requests automatically wait until the specified request delay passes to send another request:
>>> response_page_2 = api.search(page = 2)
>>> assert response_page_1.ok
>>> response_page_2
# OUTPUT: <Response [200]
>>> ml_page_2 = response_page_2.json()
DEFAULT_CACHED_SESSION: bool = False
DEFAULT_URL: str = 'https://api.plos.org/search'
__init__(query: str, provider_name: str | None = None, parameter_config: BaseAPIParameterMap | APIParameterMap | APIParameterConfig | None = None, session: Session | CachedSession | None = None, user_agent: str | None = None, timeout: int | float | None = None, masker: SensitiveDataMasker | None = None, use_cache: bool | None = None, base_url: str | None = None, api_key: SecretStr | str | None = None, records_per_page: int = 20, request_delay: float | None = None, **api_specific_parameters)[source]

Initializes the SearchAPI with a query and optional parameters. The absolute bare minimum for interacting with APIs requires a query, base_url, and an APIParameterConfig that associates relevant fields (aka query, records_per_page, etc. with fields that are specific to each API provider.

Parameters:
  • query (str) – The search keyword or query string.

  • provider_name (Optional[str]) – The name of the API provider where requests will be sent. If a provider_name and base_url are both given, the SearchAPIConfig will prioritize base_urls over the provider_name.

  • parameter_config (Optional[BaseAPIParameterMap | APIParameterMap | APIParameterConfig]) – A config that a parameter map attribute under the hood to build the parameters necessary to interact with an API. For convenience, an APIParameterMap can be provided in place of an APIParameterConfig, and the conversion will take place under the hood.

  • session (Optional[requests.Session]) – A pre-configured session or None to create a new session. A new session is created if not specified.

  • user_agent (Optional[str]) – Optional user-agent string for the session.

  • timeout – (Optional[int | float]): Identifies the number of seconds to wait before raising a TimeoutError

  • masker (Optional[str]) – Used for filtering potentially sensitive information from logs (API keys, auth bearers, emails, etc)

  • use_cache (bool) – Indicates whether or not to create a cached session. If a cached session is already specified, this setting will have no effect on the creation of a session.

  • base_url (str) – The base URL for the article API.

  • api_key (Optional[str | SecretStr]) – API key if required.

  • records_per_page (int) – Number of records to fetch per page (1-100).

  • request_delay (Optional[float]) – Minimum delay between requests in seconds. If not specified, the SearchAPI, this setting will use the default request delay defined in the SearchAPIConfig (6.1 seconds) if an override for the current provider does not exist.

  • **api_specific_parameters

    Additional parameter-value pairs to be provided to SearchAPIConfig class. API specific parameters include:

    mailto (Optional[str | SecretStr]): (CROSSREF: an optional contact for feedback on API usage) db: str (PubMed: a database to retrieve data from (example: db=pubmed)

property api_key: SecretStr | None

Retrieves the current value of the API key from the SearchAPIConfig as a SecretStr.

Note that the API key is stored as a secret key when available. The value of the API key can be retrieved by using the api_key.get_secret_value() method.

Returns:

A secret string of the API key if it exists

Return type:

Optional[SecretStr]

property api_specific_parameters: dict

This property pulls additional parameters corresponding to the API from the configuration of the current API instance.

Returns:

A list of all parameters specific to the current API.

Return type:

dict[str, APISpecificParameter]

property base_url: str

Corresponds to the base URL of the current API.

Returns:

The base URL corresponding to the API Provider

build_parameters(page: int, additional_parameters: dict[str, Any] | None = None, **api_specific_parameters) Dict[str, Any][source]

Constructs the request parameters for the API call, using the provided APIParameterConfig and its associated APIParameterMap. This method maps standard fields (query, page, records_per_page, api_key, etc.) to the provider-specific parameter names.

Using additional_parameters, an arbitrary set of parameter key-value can be added to request further customize or override parameter settings to the API. additional_parameters is offered as a convenience method in case an API may use additional arguments or a query requires specific advanced functionality.

Other arguments and mappings can be supplied through **api_specific_parameters to the parameter config, provided that the options or pre-defined mappings exist in the config.

When **api_specific_parameters and additional_parameters conflict, additional_parameters is considered the ground truth. If any remaining parameters are None in the constructed list of parameters, these values will be dropped from the final dictionary.

Parameters:
  • page (int) – The page number to request.

  • Optional[dict] (additional_parameters) – A dictionary of additional overrides that may or may not have been included in the original parameter map of the current API. (Provided for further customization of requests).

  • **api_specific_parameters – Additional parameters to provide to the parameter config: Note that the config will only accept keyword arguments that have been explicitly defined in the parameter map. For all others, they must be added using the additional_parameters parameter.

Returns:

The constructed request parameters.

Return type:

Dict[str, Any]

property cache: BaseCache | None

Retrieves the requests-session cache object if the session object is a CachedSession object.

If a session cache does not exist, this function will return None.

Returns:

The cache object if available, otherwise None.

Return type:

Optional[BaseCache]

property config: SearchAPIConfig

Property method for accessing the config for the SearchAPI.

Returns:

The configuration corresponding to the API Provider

describe() dict[source]

A helper method used that describe accepted configuration for the current provider or user-defined parameter mappings.

Returns:

a dictionary describing valid config fields and provider-specific api parameters for the current provider (if applicable).

Return type:

dict

classmethod from_defaults(query: str, provider_name: str | None, session: Session | None = None, user_agent: Annotated[str | None, 'An optional User-Agent to associate with each search'] = None, use_cache: bool | None = None, timeout: int | float | None = None, masker: SensitiveDataMasker | None = None, rate_limiter: RateLimiter | None = None, **api_specific_parameters) SearchAPI[source]

Factory method to create SearchAPI instances with sensible defaults for known providers.

PLOS is used by default unless the environment variable, SCHOLAR_FLUX_DEFAULT_PROVIDER is set to another provider.

Parameters:
  • query (str) – The search keyword or query string.

  • base_url (str) – The base URL for the article API.

  • records_per_page (int) – Number of records to fetch per page (1-100).

  • request_delay (Optional[float]) – Minimum delay between requests in seconds.

  • api_key (Optional[str | SecretStr]) – API key if required.

  • session (Optional[requests.Session]) – A pre-configured session or None to create a new session.

  • user_agent (Optional[str]) – Optional user-agent string for the session.

  • use_cache (Optional[bool]) – Indicates whether or not to use cache if a cached session doesn’t yet exist.

  • masker (Optional[str]) – Used for filtering potentially sensitive information from logs

  • **api_specific_parameters – Additional api parameter-value pairs and overrides to be provided to SearchAPIConfig class

Returns:

A new SearchAPI instance initialized with the config chosen.

classmethod from_provider_config(query: str, provider_config: ProviderConfig, session: Session | None = None, user_agent: Annotated[str | None, 'An optional User-Agent to associate with each search'] = None, use_cache: bool | None = None, timeout: int | float | None = None, masker: SensitiveDataMasker | None = None, rate_limiter: RateLimiter | None = None, **api_specific_parameters) SearchAPI[source]

Factory method to create a new SearchAPI instance using a ProviderConfig.

This method uses the default settings associated with the provider config to temporarily make the configuration settings globally available when creating the SearchAPIConfig and APIParameterConfig instances from the provider registry.

Parameters:
  • query (str) – The search keyword or query string.

  • provider_config – ProviderConfig,

  • session (Optional[requests.Session]) – A pre-configured session or None to create a new session.

  • user_agent (Optional[str]) – Optional user-agent string for the session.

  • use_cache (Optional[bool]) – Indicates whether or not to use cache if a cached session doesn’t yet exist.

  • timeout – (Optional[int | float]): Identifies the number of seconds to wait before raising a TimeoutError.

  • masker (Optional[str]) – Used for filtering potentially sensitive information from logs

  • **api_specific_parameters – Additional api parameter-value pairs and overrides to be provided to SearchAPIConfig class

Returns:

A new SearchAPI instance initialized with the chosen configuration.

classmethod from_settings(query: str, config: SearchAPIConfig, parameter_config: BaseAPIParameterMap | APIParameterMap | APIParameterConfig | None = None, session: Session | CachedSession | None = None, user_agent: str | None = None, timeout: int | float | None = None, use_cache: bool | None = None, masker=None, rate_limiter: RateLimiter | None = None) SearchAPI[source]

Advanced constructor: instantiate directly from a SearchAPIConfig instance.

Parameters:
  • query (str) – The search keyword or query string.

  • config (SearchAPIConfig) – Indicates the configuration settings to be used when sending requests to APIs

  • parameter_config – (Optional[BaseAPIParameterMap | APIParameterMap | APIParameterConfig]): Maps global scholar_flux parameters to those that are specific to the current API

  • session – (Optional[requests.Session | CachedSession]): An optional session to use for the creation of request sessions

  • timeout – (Optional[int | float]): Identifies the number of seconds to wait before raising a TimeoutError

  • use_cache – Optional[bool]: Indicates whether or not to use cache. The settings from session are otherwise used this option is not specified.

  • masker – (Optional[SensitiveDataMasker]): A masker used to filter logs of API keys and other sensitive data

  • user_agent – Optional[str] = An user agent to associate with the session

Returns:

A newly constructed SearchAPI with the chosen/validated settings

Return type:

SearchAPI

static is_cached_session(session: CachedSession | Session) bool[source]

Checks whether the current session is a cached session.

To do so, this method first determines whether the current object has a ‘cache’ attribute and whether the cache element, if existing, is a BaseCache.

Parameters:

session (requests.Session) – The session to check.

Returns:

True if the session is a cached session, False otherwise.

Return type:

bool

make_request(current_page: int, additional_parameters: dict[str, Any] | None = None, request_delay: float | None = None) Response[source]

Constructs and sends a request to the chosen api:

The parameters are built based on the default/chosen config and parameter map :param page: The page number to request. :type page: int :param additional_parameters Optional[dict]: A dictionary of additional overrides not included in the original SearchAPIConfig :param request_delay: Overrides the configured request delay for the current request only. :type request_delay: Optional[float]

Returns:

The API’s response to the request.

Return type:

requests.Response

property parameter_config: APIParameterConfig

Property method for accessing the parameter mapping config for the SearchAPI.

Returns:

The configuration corresponding to the API Provider

prepare_request(base_url: str | None = None, endpoint: str | None = None, parameters: Dict[str, Any] | None = None, api_key: str | None = None) PreparedRequest[source]

Prepares a GET request for the specified endpoint with optional parameters.

This method builds on the original base class method by additionally allowing users to specify a custom request directly while also accounting for the addition of an API key specific to the API.

Parameters:
  • base_url (str) – The base URL for the API.

  • endpoint (Optional[str]) – The API endpoint to prepare the request for.

  • parameters (Optional[Dict[str, Any]]) – Optional query parameters for the request.

Returns:

The prepared request object.

Return type:

requests.PreparedRequest

Prepares the current request given the provided page and parameters.

The prepared request object can be sent using the SearchAPI.session.send method with requests.Session and `requests_cache.CachedSession`objects.

Parameters:
  • page (Optional[int]) – Page number to query. If provided, parameters are built from the config and this page.

  • parameters (Optional[Dict[str, Any]]) – If provided alone, used as the full parameter set to build the current request. If provided together with page, these act as additional or overriding parameters on top of the built config.

Returns:

A request object that can be sent via api.session.send.

Return type:

requests.PreparedRequest

property provider_name: str

Property method for accessing the provider name in the current SearchAPI instance.

Returns:

The name corresponding to the API Provider.

property query: str

Retrieves the current value of the query to be sent to the current API.

property records_per_page: int

Indicates the total number of records to show on each page.

Returns:

an integer indicating the max number of records per page

Return type:

int

property request_delay: float

Indicates how long we should wait in-between requests.

Helpful for ensuring compliance with the rate-limiting requirements of various APIs.

Returns:

The number of seconds to wait at minimum between each request

Return type:

float

search(page: int | None = None, parameters: Dict[str, Any] | None = None, request_delay: float | None = None) Response[source]

Public method to perform a search for the selected page with the current API configuration.

A search can be performed by specifying either the page to query with the preselected defaults and additional parameter overrides for other parameters accepted by the API.

Users can also create a custom request using a parameter dictionary containing the full set of API parameters.

Parameters:
  • page (Optional[int]) – Page number to query. If provided, parameters are built from the config and this page.

  • parameters (Optional[Dict[str, Any]]) – If provided alone, used as the full parameter set for the request. If provided together with page, these act as additional or overriding parameters on top of the built config.

  • request_delay (Optional[float]) – Overrides the configured request delay for the current request only.

Returns:

A response object from the API containing articles and metadata

Return type:

requests.Response

session: Session
structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method for quickly showing a representation of the overall structure of the SearchAPI. The helper function, generate_repr_from_string helps produce human-readable representations of the core structure of the SearchAPI.

Parameters:
  • flatten (bool) – Whether to flatten the SearchAPI’s structural representation into a single line.

  • show_value_attributes (bool) – Whether to show nested attributes of the components of the SearchAPI.

Returns:

The structure of the current SearchAPI as a string.

Return type:

str

summary() str[source]

Create a summary representation of the current structure of the API.

classmethod update(search_api: SearchAPI, query: str | None = None, config: SearchAPIConfig | None = None, parameter_config: BaseAPIParameterMap | APIParameterMap | APIParameterConfig | None = None, session: Session | CachedSession | None = None, user_agent: str | None = None, timeout: int | float | None = None, use_cache: bool | None = None, masker: SensitiveDataMasker | None = None, rate_limiter: RateLimiter | None = None, **api_specific_parameters)[source]

Helper method for generating a new SearchAPI from an existing SearchAPI instance. All parameters that are not modified are pulled from the original SearchAPI. If no changes are made, an identical SearchAPI is generated from the existing defaults.

Parameters:
  • config (SearchAPIConfig) – Indicates the configuration settings to be used when sending requests to APIs

  • parameter_config (Optional[BaseAPIParameterMap | APIParameterMap | APIParameterConfig]) – Maps global scholar_flux parameters to those that are API specific.

  • session – (Optional[requests.Session | CachedSession]): An optional session to use for the creation of request sessions

  • timeout – (Optional[int | float]): Identifies the number of seconds to wait before raising a TimeoutError

  • use_cache – Optional[bool]: Indicates whether or not to use cache. The settings from session are otherwise used this option is not specified.

  • masker – (Optional[SensitiveDataMasker]): A masker used to filter logs of API keys and other sensitive data

  • user_agent – Optional[str] = An user agent to associate with the session

Returns:

A newly constructed SearchAPI with the chosen/validated settings

Return type:

SearchAPI

with_config(config: SearchAPIConfig | None = None, parameter_config: APIParameterConfig | None = None, provider_name: str | None = None, query: str | None = None) Iterator[SearchAPI][source]

Temporarily modifies the SearchAPI’s SearchAPIConfig and/or APIParameterConfig and namespace. You can provide a config, a parameter_config, or a provider_name to fetch defaults. Explicitly provided configs take precedence over provider_name, and the context manager will revert changes to the parameter mappings and search configuration afterward.

Parameters:
  • config (Optional[SearchAPIConfig]) – Temporary search api configuration to use within the context to control where and how response records are retrieved.

  • parameter_config (Optional[APIParameterConfig]) – Temporary parameter config to use within the context to resolve universal parameters names to those that are specific to the current api.

  • provider_name (Optional[str]) – Used to retrieve the associated configuration for a specific provider in order to edit the parameter map when using a different provider.

  • query (Optional[str]) – Allows users to temporarily modify the query used to retrieve records from an API.

Yields:

SearchAPI – The current api object with a temporarily swapped config during the context manager.

with_config_parameters(provider_name: str | None = None, query: str | None = None, **api_specific_parameters) Iterator[SearchAPI][source]

Allows for the temporary modification of the search configuration, and parameter mappings, and cache namespace. For the current API. Uses a contextmanager to temporarily change the provided parameters without persisting the changes.

Parameters:
  • provider_name (Optional[str]) – If provided, fetches the default parameter config for the provider.

  • query (Optional[str]) – Allows users to temporarily modify the query used to retrieve records from an API.

  • **api_specific_parameters (SearchAPIConfig) – Fields to temporarily override in the current config.

Yields:

SearchAPI – The API object with temporarily swapped config and/or parameter config.

scholar_flux.api.search_coordinator module

Defines the SearchCoordinator that provides enhanced customization and single/multi-page response retrieval and processing of record data from APIs.

class scholar_flux.api.search_coordinator.SearchCoordinator(search_api: SearchAPI | None = None, response_coordinator: ResponseCoordinator | None = None, parser: BaseDataParser | None = None, extractor: BaseDataExtractor | None = None, processor: ABCDataProcessor | None = None, cache_manager: DataCacheManager | None = None, query: str | None = None, provider_name: str | None = None, cache_requests: bool | None = None, cache_results: bool | None = None, retry_handler: RetryHandler | None = None, validator: ResponseValidator | None = None, workflow: SearchWorkflow | None = None, **kwargs)[source]

Bases: BaseCoordinator

High-level coordinator for requesting and retrieving records and metadata from APIs.

This class uses dependency injection to orchestrate the process of constructing requests, validating response, and processing scientific works and articles. This class is designed to abstract away the complexity of using APIs while providing a consistent and robust interface for retrieving record data and metadata from request and storage cache if valid to help avoid exceeding limits in API requests.

If no search_api is provided, the coordinator will create a Search API that uses the default provider if the environment variable, SCHOLAR_FLUX_DEFAULT_PROVIDER, is not provided. Otherwise PLOS is used on the backend.

__init__(search_api: SearchAPI | None = None, response_coordinator: ResponseCoordinator | None = None, parser: BaseDataParser | None = None, extractor: BaseDataExtractor | None = None, processor: ABCDataProcessor | None = None, cache_manager: DataCacheManager | None = None, query: str | None = None, provider_name: str | None = None, cache_requests: bool | None = None, cache_results: bool | None = None, retry_handler: RetryHandler | None = None, validator: ResponseValidator | None = None, workflow: SearchWorkflow | None = None, **kwargs)[source]

Flexible initializer that constructs a SearchCoordinator either from its core components or from their basic building blocks when these core components are not directly provided.

If search_api and response_coordinator are provided, then this method will use these inputs directly.

The additional parameters can still be used to update these two components. For example, a search_api can be updated with a new query, session, and SearchAPIConfig parameters through keyword arguments (**kwargs))

When neither component is provided:
  • The creation of the search_api requires, at minimum, a query.

  • If the response_coordinator, a parser, extractor, processor, and cache_manager aren’t provided, then a new ResponseCoordinator will be built from the default settings.

Core Components/Attributes:
SearchAPI: handles all requests to an API based on its configuration.

Dependencies: query, **kwargs

ResponseCoordinator:handles the parsing, record/metadata extraction, processing, and caching of responses

Dependencies: parser, extractor, processor, cache_manager

Other Attributes:

RetryHandler: Addresses when to retry failed requests and how failed requests are retried SearchWorkflow: An optional workflow that defines custom search logic from specific APIs Validator: handles how requests are validated. The default determines whether a 200 response was received

Note

This implementation uses the underlying private method _initialize to handle the assignment of parameters under the hood while the core function of the __init__ creates these components if they do not already exist.

Parameters:
  • search_api (Optional[SearchAPI]) – The search API to use for the retrieval of response records from APIs

  • response_coordinator (Optional[ResponseCoordinator]) – Core class used to handle the processing and core handling of all responses from APIs

  • parser (Optional(BaseDataParser)) – First step of the response processing pipeline - parses response records into a dictionary

  • extractor (Optional[BaseDataExtractor]) – Extracts both records and metadata from responses separately

  • processor (Optional[ABCDataProcessor]) – Processes the previously extracted API records into list of dictionaries that are filtered and optionally flattened during processing

  • cache_manager (Optional[DataCacheManager]) – Manages the caching of processed records for faster retrieval

  • query (Optional[str]) – Query to be used when sending requests when creating an API - modifies the query if the API already exists

  • provider_name (Optional[str]) – The name of the API provider where requests will be sent. If a provider_name and base_url are both given, the SearchAPIConfig will prioritize base_urls over the provider_name.

  • cache_requests (Optional[bool]) – Determines whether or not to cache requests - api is the ground truth if not directly specified

  • cache_results (Optional[bool]) – Determines whether or not to cache processed responses - on by default unless specified otherwise

  • retry_handler (Optional[RetryHandler]) – class used to retry failed requests-cache

  • validator (Optional[ResponseValidator]) – class used to verify and validate responses returned from APIs

  • workflow (Optional[SearchWorkflow]) – An optional workflow used to customize how records are retrieved from APIs. Uses the default workflow for the current provider when a workflow is not directly specified.

  • **kwargs – Keyword arguments to be passed to the SearchAPIConfig that creates the SearchAPI if it doesn’t already exist

  • Examples

    >>> from scholar_flux import SearchCoordinator
    >>> from scholar_flux.api import APIResponse, ReconstructedResponse
    >>> from scholar_flux.sessions import CachedSessionManager
    >>> from typing import MutableMapping
    >>> session = CachedSessionManager(user_agent = 'scholar_flux', backend='redis').configure_session()
    >>> search_coordinator = SearchCoordinator(query = "Intrinsic Motivation", session = session, cache_results = False)
    >>> response = search_coordinator.search(page = 1)
    >>> response
    # OUTPUT: <ProcessedResponse(len=50, cache_key='plos_Functional Processing_1_50', metadata='...') ': 1, 'maxSco...")>
    >>> new_response = ReconstructedResponse.build(**response.response.__dict__)
    >>> new_response.validate()
    >>> new_response = ReconstructedResponse.build(response.response)
    >>> ReconstructedResponse.build(new_response).validate()
    >>> new_response.validate()
    >>> newer_response = APIResponse.as_reconstructed_response(new_response)
    >>> newer_response.validate()
    >>> double_processed_response = search_coordinator._process_response(response = newer_response, cache_key = response.cache_key)
    

classmethod as_coordinator(search_api: SearchAPI, response_coordinator: ResponseCoordinator, *args, **kwargs) SearchCoordinator[source]

Helper factory method for building a SearchCoordinator that allows users to build from the final building blocks of a SearchCoordinator.

Parameters:
  • search_api (Optional[SearchAPI]) – The search API to use for the retrieval of response records from APIs

  • response_coordinator (Optional[ResponseCoordinator]) – Core class used to handle the processing and core handling of all responses from APIs

Returns:

A newly created coordinator that orchestrates record retrieval and processing

Return type:

SearchCoordinator

fetch(page: int, from_request_cache: bool = True, raise_on_error: bool = False, **api_specific_parameters) Response | ResponseProtocol | None[source]

Fetches the raw response from the current API or from cache if available.

Parameters:
  • page (int) – The page number to retrieve from the cache.

  • from_request_cache (bool) – This parameter determines whether to try to fetch a valid response from cache.

  • **api_specific_parameters (SearchAPIConfig) – Fields to temporarily override when building the request.

Returns:

The response object if available, otherwise None.

Return type:

Optional[Response]

get_cached_request(page: int, **kwargs) Response | ResponseProtocol | None[source]

Retrieves the cached request for a given page number if available.

Parameters:

page (int) – The page number to retrieve from the cache.

Returns:

The cached request object if available, otherwise None.

Return type:

Optional[Response]

get_cached_response(page: int) Dict[str, Any] | None[source]

Retrieves the cached response for a given page number if available.

Parameters:

page (int) – The page number to retrieve from the cache.

Returns:

The cached response data if available, otherwise None.

Return type:

Optional[Dict[str, Any]]

iter_pages(pages: Sequence[int] | PageListInput, from_request_cache: bool = True, from_process_cache: bool = True, use_workflow: bool | None = True, **api_specific_parameters) Generator[SearchResult, None, None][source]

Helper method that creates a generator function for retrieving and processing records from the API Provider for a page range in sequence. This implementation dynamically examines the properties of the page search result for each retrieved API response to determine whether or not iteration should halt early versus determining whether iteration should continue.

This method is directly used by SearchCoordinator.search_pages to provide a clean interface that abstracts the complexity of iterators and is also provided for convenience when iteration is more preferable.

Parameters:
  • pages (Sequence[int] | PageListInput) – A sequence of page numbers to request from the API Provider.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage.

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage.

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

  • **api_specific_parameters (SearchAPIConfig) – Fields to temporarily override when building the request.

Yields:

SearchResult

Iteratively returns the SearchResult for each page using a generator expression.

Each result contains the requested page number (page), the name of the provider (provider_name), and the result of the search containing a ProcessedResponse, an ErrorResponse, or None (api response)

robust_request(page: int, **api_specific_parameters) Response | ResponseProtocol | None[source]

Constructs and sends a request to the current API. Fetches a response from the current API.

Parameters:
  • page (int) – The page number to retrieve from the cache.

  • **kwargs – Optional Additional parameters to pass to the SearchAPI

Returns:

The request object if available, otherwise None.

Return type:

Optional[Response]

search(page: int = 1, from_request_cache: bool = True, from_process_cache: bool = True, use_workflow: bool | None = True, **api_specific_parameters) ProcessedResponse | ErrorResponse | None[source]

Public method for retrieving and processing records from the API specifying the page and records per page. Note that the response object is saved under the last_response attribute in the event that the response is retrieved and processed successfully, irrespective of whether the response was cached.

Parameters:
  • page (int) – The current page number. Used for process caching purposes even if not required by the API

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

  • **api_specific_parameters (SearchAPIConfig) – Fields to temporarily override when building the request.

Returns:

A ProcessedResponse model containing the response (response), processed records (data), and article metadata (metadata) if the response was successful. Otherwise returns an ErrorResponse where the reason behind the error (message), exception type (error), and response (response) are provided. Possible error responses also include a NonResponse (an ErrorResponse subclass) for cases where a response object is irretrievable. Like the ErrorResponse class, NonResponse is also Falsy (i.e., not NonResponse returns True)

Return type:

Optional[ProcessedResponse | ErrorResponse]

search_data(page: int = 1, from_request_cache: bool = True, from_process_cache: bool = True) List[Dict] | None[source]

Public method to perform a search, specifying the page and records per page. Note that instead of returning a ProcessedResponse or ErrorResponse, this calls the search method and retrieves only the list of processed dictionary records from the ProcessedResponse.

Parameters:
  • page (int) – The current page number.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage stored within the SearchCoordinator.search_api.cache

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the processing cache stored within the SearchCoordinator.response_coordinator.cache

Returns:

A List of records containing processed article data

Return type:

Optional[List[Dict]]

search_pages(pages: Sequence[int] | PageListInput, from_request_cache: bool = True, from_process_cache: bool = True, use_workflow: bool | None = True, **api_specific_parameters) SearchResultList[source]

Public method for retrieving and processing records from the API specifying the page and records per page in sequence. This method Note that the response object is saved under the last_response attribute in the event that the data is processed successfully, irrespective of whether responses are cached or not.

Parameters:
  • page (int) – The current page number. Used for process caching purposes even if not required by the API

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

  • **api_specific_parameters (SearchAPIConfig) – Fields to temporarily override when building the request.

Returns:

A list of response data classes containing processed article data (data).

Note that processing stops if the response for a given page is None, is not retrievable, or contains less than the expected number of responses, indicating that the next page may contain no more records.

Return type:

List[ProcessedResponse]

classmethod update(search_coordinator: SearchCoordinator, search_api: SearchAPI | None = None, response_coordinator: ResponseCoordinator | None = None, retry_handler: RetryHandler | None = None, validator: ResponseValidator | None = None, workflow: SearchWorkflow | None = None) SearchCoordinator[source]

Helper factory method allowing the creation of a new components based on an existing configuration while allowing the replacement of previous components. Note that this implementation does not directly copy the underlying components if a new component is not selected.

Parameters:
  • SearchCoordinator – A previously created coordinator containing the components to use if a default is not provided

  • search_api (Optional[SearchAPI]) – The search API to use for the retrieval of response records from APIs

  • response_coordinator (Optional[ResponseCoordinator]) – Core class used to handle the processing and core handling of all responses from APIs

  • retry_handler (Optional[RetryHandler]) – class used to retry failed requests-cache

  • validator (Optional[ResponseValidator]) – class used to verify and validate responses returned from APIs

  • workflow (Optional[SearchWorkflow]) – An optional workflow used to customize how records are retrieved from APIs. Uses the default workflow for the current provider when a workflow is not directly specified and does not directly carry over in cases where a new provider is chosen.

Returns:

A newly created coordinator that orchestrates record retrieval and processing

Return type:

SearchCoordinator

scholar_flux.api.validators module

The scholar_flux.api.validators module implements methods that are used within the validation of scholar_flux API configurations to ensure that valid and invalid inputs are received as such.

Functions:

validate_email:

Used to verify whether an email matches the expected pattern

validate_and_validate_and_process_email:

Attempts to masks valid emails and raises an error on invalid input

validate_url:

Used to verify whether an url is a valid string

normalize_url:

Uses regular expressions to format the URL in a consistent format for string comparisons

validate_and_process_url:

validates URLs to ensure that it matches the expected format and normalizes the URL for later use

scholar_flux.api.validators.normalize_url(url: str, normalize_https: bool = True) str[source]

Helper class to aid in comparisons of string urls. Normalizes a URL for consistent comparisons by converting to https:// and stripping right-most forward slashes (‘/’).

Parameters:
  • url (str) – The url to normalize into a consistent structure for later comparison

  • normalize_https (bool) – indicates whether to normalize the http identifier on the URL. This is True by default.

Returns:

The normalized url

Return type:

str

scholar_flux.api.validators.validate_and_process_email(email: SecretStr | str | None) SecretStr | None[source]

If a string value is provided, determine whether the email is valid.

This function first uses the validate_email function for the validation of the email. If the value is not an email, this implementation will raise an Error

Parameters:

email (Optional[str]) – an email to validate if non-missing

Returns:

True if the email is valid or is not provided, and False Otherwise

Raises:

ValueError – If the current value is not an email

scholar_flux.api.validators.validate_and_process_url(url: str | None) str | None[source]

If a string value is provided, determine whether the url is valid.

This function first uses the validate_url function for the validation of the url.

Parameters:

url (Optional[str]) – an url to validate if non-missing

Returns:

True if the url is valid or is not provided, and False Otherwise

scholar_flux.api.validators.validate_email(email: str) bool[source]

Uses regex to determine whether the provided value is an email.

Parameters:

email (str) – The email string to validate

Returns:

True if the email is valid, and False Otherwise

scholar_flux.api.validators.validate_url(url: str) bool[source]

Uses urlparse to determine whether the provided value is an url.

Parameters:

url (str) – The url string to validate

Returns:

True if the url is valid, and False Otherwise

Module contents

The scholar_flux.api module includes the core classes and functionality necessary to interact with APIs in a universally applicable manner. This module defines the methods necessary to retrieve raw responses from APIs based on the configuration used for the API client (SearchAPI).

Sub-modules:
models: Contains the classes used to set up new configurations in addition to the API utility models

and modules necessary to interact with APIs

providers: Defines the default provider specifications to easily create a new client for a specific

provider with minimal code. (e.g., plos.py contains the necessary config settings for the PLOS API)

workflows: Defines custom workflows for APIs requiring API-specific logic modifications for easier record retrieval.

This includes the PubMed Workflow which searches IDs and then fetches the records

rate_limiting: Defines the methods and classes used to ensure that the rate limits associated with each API

are not exceeded. The SearchAPI implements rate limiting using the RateLimiter and, optionally, ThreadedRateLimiter class to wait a specified interval of time before sending the next request.

In order to use the API one can get started with the SearchCoordinator with minimal effort:
>>> from scholar_flux.api import SearchCoordinator # imports the most forward facing interface for record retrieval
>>> search_coordinator = SearchCoordinator(query = 'Turing Machines') # uses PLOS by default
>>> print(search_coordinator.api) # Shows the core SearchAPI specification used to send requests to APIs
>>> processed_response = search_coordinator.search(page = 1) # retrieves and processes records from the API response
You can also retrieve the responses directly without processing via the SearchAPI:
>>> from scholar_flux.api import SearchAPI # imports the core SearchAPI used by the coordinator to send requests
>>> api = SearchAPI(query='ML') # uses PLOS by default
>>> response = api.search(page = 1) # retrieves and processes records from the API response
The functionality of the SearchCoordinators are further customized using the following modules:

scholar_flux.sessions: Contains the core classes for directly setting up cached sessions scholar_flux.data: Contains the core classes used to parse, extract, and process records scholar_flux.data_storage: Contains the core classes used for caching scholar_flux.security: Contains the core classes used for ensuring security in console and logging (e.g API keys)

class scholar_flux.api.APIParameterConfig(parameter_map: APIParameterMap)[source]

Bases: object

Uses an APIParameterMap instance and runtime parameter values to build parameter dictionaries for API requests.

Parameters:

parameter_map (APIParameterMap) – The mapping of universal to API-specific parameter names.

Class Attributes:
DEFAULT_CORRECT_ZERO_INDEX (bool):

Autocorrects zero-indexed API parameter building specifications to only accept positive values when True. If otherwise False, page calculation APIs will start from page 0 if zero-indexed (i.e., arXiv).

Examples

>>> from scholar_flux.api import APIParameterConfig, APIParameterMap
>>> # the API parameter map is defined and used to resolve parameters to the API's language
>>> api_parameter_map = APIParameterMap(
... query='q', records_per_page = 'pagesize', start = 'page', auto_calculate_page = False
... )
# The APIParameterConfig defines class and settings that indicate how to create requests
>>> api_parameter_config = APIParameterConfig(api_parameter_map, auto_calculate_page = False)
# Builds parameters using the specification from the APIParameterMap
>>> page = api_parameter_config.build_parameters(query= 'ml', page = 10, records_per_page=50)
>>> print(page)
# OUTPUT {'q': 'ml', 'page': 10, 'pagesize': 50}
DEFAULT_CORRECT_ZERO_INDEX: ClassVar[bool] = True
__init__(*args: Any, **kwargs: Any) None
classmethod as_config(parameter_map: dict | BaseAPIParameterMap | APIParameterMap | APIParameterConfig) APIParameterConfig[source]

Factory method for creating a new APIParameterConfig from a dictionary or APIParameterMap.

This helper class method resolves the structure of the APIParameterConfig against its basic building blocks to create a new configuration when possible.

Parameters:

parameter_map (dict | BaseAPIParameterMap | APIParameterMap | APIParameterConfig) – A parameter mapping/config to use in the instantiation of an APIParameterConfig.

Returns:

A new structure from the inputs

Return type:

APIParameterConfig

Raises:

APIParameterException – If there is an error in the creation/resolution of the required parameters

build_parameters(query: str | None, page: int | None, records_per_page: int, **api_specific_parameters) Dict[str, Any][source]

Builds the dictionary of request parameters using the current parameter map and provided values at runtime.

Parameters:
  • query (Optional[str]) – The search query string.

  • page (Optional[int]) – The page number for pagination (1-based).

  • records_per_page (int) – Number of records to fetch per page.

  • **api_specific_parameters – Additional API-specific parameters to include.

Returns:

The fully constructed API request parameters dictionary, with keys as API-specific parameter names and values as provided.

Return type:

Dict[str, Any]

classmethod from_defaults(provider_name: str, **additional_parameters) APIParameterConfig[source]

Factory method to create APIParameterConfig instances with sensible defaults for known APIs.

If the provider_name does not exist, the code will raise an exception.

Parameters:
  • provider_name (str) – The name of the API to create the parameter map for.

  • api_key (Optional[str]) – API key value if required.

  • additional_parameters (dict) – Additional parameter mappings.

Returns:

Configured parameter config instance for the specified API.

Return type:

APIParameterConfig

Raises:

NotImplementedError – If the API name is unknown.

classmethod get_defaults(provider_name: str, **additional_parameters) APIParameterConfig | None[source]

Factory method to create APIParameterConfig instances with sensible defaults for known APIs.

Avoids throwing an error if the provider name does not already exist.

Parameters:
  • provider_name (str) – The name of the API to create the parameter map for.

  • additional_parameters (dict) – Additional parameter mappings.

Returns:

Configured parameter config instance for the specified API. Returns None if a mapping for the provider_name isn’t retrieved

Return type:

Optional[APIParameterConfig]

property map: APIParameterMap

Helper property that is an alias for the APIParameterMap attribute.

The APIParameterMap maps all universal parameters to the parameter names specific to the API provider.

Returns:

The mapping that the current APIParameterConfig will use to build a dictionary of parameter requests specific to the current API.

Return type:

APIParameterMap

parameter_map: APIParameterMap
show_parameters() list[source]

Helper method to show the complete list of all parameters that can be found in the current_mappings.

Returns:

The complete list of all universal and api specific parameters corresponding to the current API

Return type:

List

structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method that shows the current structure of the APIParameterConfig.

class scholar_flux.api.APIParameterMap(*, query: str, records_per_page: str, start: str | None = None, api_key_parameter: str | None = None, api_key_required: bool = False, auto_calculate_page: bool = True, zero_indexed_pagination: bool = False, api_specific_parameters: ~typing.Dict[str, ~scholar_flux.api.models.base_parameters.APISpecificParameter] = <factory>)[source]

Bases: BaseAPIParameterMap

Extends BaseAPIParameterMap by adding validation and the optional retrieval of provider defaults for known APIs.

This class also specifies default mappings for specific attributes such as API keys and additional parameter names.

query

The API-specific parameter name for the search query.

Type:

str

start

The API-specific parameter name for pagination (start index or page number).

Type:

Optional[str]

records_per_page

The API-specific parameter name for records per page.

Type:

str

api_key_parameter

The API-specific parameter name for the API key.

Type:

Optional[str]

api_key_required

Indicates whether an API key is required.

Type:

bool

auto_calculate_page

If True, calculates start index from page; if False, passes page number directly.

Type:

bool

zero_indexed_pagination

If True, treats 0 as an allowed page value when retrieving data from APIs.

Type:

bool

api_specific_parameters

Additional universal to API-specific parameter mappings.

Type:

Dict[str, str]

api_key_parameter: str | None
api_key_required: bool
api_specific_parameters: Dict[str, APISpecificParameter]
auto_calculate_page: bool
classmethod from_defaults(provider_name: str, **additional_parameters) APIParameterMap[source]

Factory method that uses the APIParameterMap.get_defaults classmethod to retrieve the provider config.

Raises an error if the provider does not exist.

Parameters:
  • provider_name (str) – The name of the API to create the parameter map for.

  • additional_parameters (dict) – Additional parameter mappings.

Returns:

Configured parameter map for the specified API.

Return type:

APIParameterMap

Raises:

NotImplementedError – If the API name is unknown.

classmethod get_defaults(provider_name: str, **additional_parameters) APIParameterMap | None[source]

Factory method to create APIParameterMap instances with sensible defaults for known APIs.

This class method attempts to pull from the list of known providers defined in the scholar_flux.api.providers.provider_registry and returns None if an APIParameterMap for the provider cannot be found.

Using the additional_parameters keyword arguments, users can specify optional overrides for specific parameters if needed. This is helpful in circumstances where an API’s specification overlaps with that of a known provider.

Valid providers (as indicated in provider_registry) include:

  • springernature

  • plos

  • arxiv

  • openalex

  • core

  • crossref

Parameters:
  • provider_name (str) – The name of the API provider to retrieve the parameter map for.

  • additional_parameters (dict) – Additional parameter mappings.

Returns:

Configured parameter map for the specified API.

Return type:

Optional[APIParameterMap]

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

query: str
records_per_page: str
classmethod set_default_api_key_parameter(values: dict[str, Any]) dict[str, Any][source]

Sets the default for the api key parameter when api_key_required`=True and `api_key_parameter is None.

Parameters:

values (dict[str, Any]) – The dictionary of attributes to validate

Returns:

The updated parameter values passed to the APIParameterMap. api_key_parameter is set to “api_key” if key is required but not specified

Return type:

dict[str, Any]

start: str | None
classmethod validate_api_specific_parameter_mappings(values: dict[str, Any]) dict[str, Any][source]

Validates the additional mappings provided to the APIParameterMap.

This method validates that the input is dictionary of mappings that consists of only string-typed keys mapped to API-specific parameters as defined by the APISpecificParameter class.

Parameters:

values (dict[str, Any]) – The dictionary of attribute values to validate.

Returns:

The updated dictionary if validation passes.

Return type:

dict[str, Any]

Raises:

APIParameterException – If api_specific_parameters is not a dictionary or contains non-string keys/values.

zero_indexed_pagination: bool
class scholar_flux.api.APIResponse(*, cache_key: str | None = None, response: Any | None = None, created_at: str | None = None)[source]

Bases: BaseModel

A Response wrapper for responses of different types that allows consistency when using several possible backends. The purpose of this class is to serve as the base for managing responses received from scholarly APIs while processing each component in a predictable, reproducible manner,

This class uses pydantic’s data validation and serialization/deserialization methods to aid caching and includes properties that refer back to the original response for displaying valid response codes, URLs, etc.

All future processing/error-based responses classes inherit from and build off of this class.

Parameters:
  • cache_key (Optional[str]) – A string for recording cache keys for use in later steps of the response orchestration involving processing, cache storage, and cache retrieval

  • response (Any) – A response or response-like object to be validated and used/re-used in later caching and response processing/orchestration steps.

  • created_at (Optional[str]) – A value indicating the time in which a response or response-like object was created.

Example

>>> from scholar_flux.api import APIResponse
# Using keyword arguments to build a basic APIResponse data container:
>>> response = APIResponse.from_response(
>>>     cache_key = 'test-response',
>>>     status_code = 200,
>>>     content=b'success',
>>>     url='https://example.com',
>>>     headers={'Content-Type': 'application/text'}
>>> )
>>> response
# OUTPUT: APIResponse(cache_key='test-response', response = ReconstructedResponse(
#    status_code=200, reason='OK', headers={'Content-Type': 'application/text'},
#    text='success', url='https://example.com'
#)
>>> assert response.status == 'OK' and response.text == 'success' and response.url == 'https://example.com'
# OUTPUT: True
>>> assert response.validate_response()
# OUTPUT: True
classmethod as_reconstructed_response(response: Any) ReconstructedResponse[source]

Classmethod designed to create a reconstructed response from an original response object. This method coerces response attributes into a reconstructed response that retains the original content, status code, headers, URL, reason, etc.

Returns:

A minimal response object that contains the core attributes needed to support

other processes in the scholar_flux module such as response parsing and caching.

Return type:

ReconstructedResponse

cache_key: str | None
property content: bytes | None

Return content from the underlying response, if available and valid.

Returns:

The bytes from the original response content

Return type:

(bytes)

created_at: str | None
encode_response(response: Any) Dict[str, Any] | List[Any] | None[source]

Helper method for serializing a response into a json format. Accounts for special cases such as CaseInsensitiveDict fields that are otherwise unserializable.

From this step, pydantic can safely use json internally to dump the encoded response fields

classmethod from_response(response: Any | None = None, cache_key: str | None = None, auto_created_at: bool | None = None, **kwargs) Self[source]

Construct an APIResponse from a response object or from keyword arguments.

If response is not a valid response object, builds a minimal response-like object from kwargs.

classmethod from_serialized_response(response: Any | None = None, **kwargs) ReconstructedResponse | None[source]

Helper method for creating a new APIresponse from the original dumped object. This method Accounts for lack of ease of serialization of responses by decoding the response dictionary that was loaded from a string using json.loads from the json module in the standard library.

If the response input is still a serialized string, this method will manually load the response dict with the APIresponse._deserialize_response_dict class method before further processing.

Parameters:

response (Any) – A prospective response value to load into the API Response.

Returns:

A reconstructed response object, if possible. Otherwise returns None

Return type:

Optional[ReconstructedResponse]

property headers: MutableMapping[str, str] | None

Return headers from the underlying response, if available and valid.

Returns:

A dictionary of headers from the response

Return type:

MutableMapping[str, str]

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

raise_for_status()[source]

Uses an underlying response object to validate the status code associated with the request.

If the attribute isn’t a response or reconstructed response, the code will coerce the class into a response object to verify the status code for the request URL and response.

property reason: str | None

Uses the underlying reason attribute on the response object, if available, to create a human readable status description.

Returns:

The status description associated with the response.

Return type:

Optional[str]

response: Any | None
classmethod serialize_response(response: Response | ResponseProtocol) str | None[source]

Helper method for serializing a response into a json format. The response object is first converted into a serialized string and subsequently dumped after ensuring that the field is serializable.

Parameters:

response (Response, ResponseProtocol)

property status: str | None

Helper property for retrieving a human-readable status description APIResponse.

Returns:

The status description associated with the response (if available).

Return type:

Optional[int]

property status_code: int | None

Helper property for retrieving a status code from the APIResponse.

Returns:

The status code associated with the response (if available)

Return type:

Optional[int]

property text: str | None

Attempts to retrieve the response text by first decoding the bytes of the its content. If not available, this property attempts to directly reference the text attribute directly.

Returns:

A text string if the text is available in the correct format, otherwise None

Return type:

Optional[str]

classmethod transform_response(v: Any) Response | ResponseProtocol | None[source]

Attempts to resolve a response object as an original or ReconstructedResponse: All original response objects (duck-typed or requests response) with valid values will be returned as is.

If the passed object is a string - this function will attempt to serialize it before attempting to parse it as a dictionary.

Dictionary fields will be decoded, if originally encoded, and parsed as a ReconstructedResponse object, if possible.

Otherwise, the original object is returned as is.

property url: str | None

Return URL from the underlying response, if available and valid.

Returns:

A string of the original URL if available. Accounts for objects that

that indicate the original url when converted as a string

Return type:

str

classmethod validate_iso_timestamp(v: str | datetime | None) str | None[source]

Helper method for validating and ensuring that the timestamp accurately follows an iso 8601 format.

validate_response() bool[source]

Helper method for determining whether the response attribute is truly a response. If the response isn’t a requests response, we use duck-typing to determine whether the response attribute, itself, has the expected attributes of a response by using properties for checking types vs None (if the attribute isn’t the expected type)

Returns:

An indicator of whether the current APIResponse.response attribute is

actually a response

Return type:

bool

class scholar_flux.api.BaseAPI(user_agent: str | None = None, session: Session | None = None, timeout: int | float | None = None, use_cache: bool | None = None)[source]

Bases: object

The BaseAPI client is a minimal implementation for user-friendly request preparation and response retrieval.

Parameters:
  • session (Optional[requests.Session]) – A pre-configured requests or requests-cache session. A new session is created if not specified.

  • user_agent (Optional[str]) – An optional user-agent string for the session.

  • timeout – (Optional[int | float]): Identifies the number of seconds to wait before raising a TimeoutError

  • use_cache (bool) – Indicates whether or not to create a cached session. If a cached session is already specified, this setting will have no effect on the creation of a session.

Examples

>>> from scholar_flux.api import BaseAPI
# creating a basic API client that uses the PLOS API as the default while caching response data in-memory:
>>> base_api = BaseAPI(use_cache=True)
# retrieve a basic request:
>>> parameters = {'q': 'machine learning', 'start': 1, 'rows': 20}
>>> response_page_1 = base_api.send_request('https://api.plos.org/search', parameters=parameters)
>>> assert response_page_1.ok
>>> response_page_1
# OUTPUT: <Response [200]>
>>> ml_page_1 = response_page_1.json()
# retrieving the next page:
>>> parameters['start'] = 21
>>> response_page_2 = base_api.send_request('https://api.plos.org/search', parameters=parameters)
>>> assert response_page_2.ok
>>> response_page_2
# OUTPUT: <Response [200]>
>>> ml_page_2 = response_page_2.json()
>>> ml_page_2
# OUTPUT: {'response': {'numFound': '...', 'start': 21, 'docs': ['...']}} # redacted
DEFAULT_TIMEOUT: int = 20
DEFAULT_USE_CACHE: bool = False
__init__(user_agent: str | None = None, session: Session | None = None, timeout: int | float | None = None, use_cache: bool | None = None)[source]

Initializes the BaseAPI client for response retrieval given the provided inputs.

The necessary attributes are prepared with a new or existing session (cached or uncached) via dependency injection. This class is designed to be subclassed for specific API implementations.

Parameters:
  • user_agent (Optional[str]) – Optional user-agent string for the session.

  • session (Optional[requests.Session]) – A pre-configured session or None to create a new session.

  • timeout (Optional[int | float]) – Timeout for requests in seconds.

  • use_cache (Optional[bool]) – Indicates whether or not to use cache. The default setting is to create a regular requests.Session unless a CachedSession is already provided.

configure_session(session: Session | None = None, user_agent: str | None = None, use_cache: bool | None = None) Session[source]

Creates a session object if one does not already exist. If use_cache = True, then a cached session object will be used. A regular session that is not already cached will be overridden.

Parameters:
  • session (Optional[requests.Session]) – A pre-configured session or None to create a new session.

  • user_agent (Optional[str]) – Optional user-agent string for the session.

  • use_cache (Optional[bool]) – Indicates whether or not to use cache if a cached session doesn’t yet exist. If use_cache is True and a cached session has already been passed, the previously created cached session is returned. Otherwise, a new CachedSession is created.

Returns:

The configured session.

Return type:

requests.Session

prepare_request(base_url: str, endpoint: str | None = None, parameters: Dict[str, Any] | None = None) PreparedRequest[source]

Prepares a GET request for the specified endpoint with optional parameters.

Parameters:
  • base_url (str) – The base URL for the API.

  • endpoint (Optional[str]) – The API endpoint to prepare the request for.

  • parameters (Optional[Dict[str, Any]]) – Optional query parameters for the request.

Returns:

The prepared request object.

Return type:

prepared_request (PreparedRequest)

send_request(base_url: str, endpoint: str | None = None, parameters: Dict[str, Any] | None = None, timeout: int | float | None = None) Response[source]

Sends a GET request to the specified endpoint with optional parameters.

Parameters:
  • base_url (str) – The base API to send the request to.

  • endpoint (Optional[str]) – The endpoint of the API to send the request to.

  • parameters (Optional[Dict[str, Any]]) – Optional query parameters for the request.

  • timeout (int) – Timeout for the request in seconds.

Returns:

The response object.

Return type:

requests.Response

session: Session
structure(flatten: bool = True, show_value_attributes: bool = False) str[source]

Base method for showing the structure of the current BaseAPI. This method reveals the configuration settings of the API client that will be used to send requests.

Returns:

The current structure of the BaseAPI or its subclass.

Return type:

str

summary() str[source]

Create a summary representation of the current structure of the API:

Returns the original representation.

property user_agent: str | None

The User-Agent should always reflect what is used in the session.

This method retrieves the User-Agent from the session directly.

class scholar_flux.api.BaseCoordinator(search_api: SearchAPI, response_coordinator: ResponseCoordinator)[source]

Bases: object

BaseCoordinator providing the minimum functionality for requesting and retrieving records and metadata from APIs.

This class uses dependency injection to orchestrate the process of constructing requests, validating responses, and processing scientific works and articles. This class is designed to provide the absolute minimum necessary functionality to both retrieve and process data from APIs and can make use of caching functionality for caching requests and responses.

After initialization, the BaseCoordinator uses two main components for the sequential orchestration of response retrieval, processing, and caching.

Components:
SearchAPI (api/search_api):

Handles the creation and orchestration of search requests in addition to the caching of successful requests via dependency injection.

ResponseCoordinator (responses/response_coordinator): Handles the full range of response

processing steps after retrieving a response from an API. These parsing, extraction, and processing steps occur sequentially when a new response is received. If a response was previously handled, the coordinator will attempt to retrieve these responses from the processing cache.

Example

>>> from scholar_flux.api import SearchAPI, ResponseCoordinator, BaseCoordinator
# Note: the SearchAPI uses PLOS by default if `provider_name` is not provided.
# Unless the `SCHOLAR_FLUX_DEFAULT_PROVIDER` env variable is set to another provider.
>>> base_search_coordinator = BaseCoordinator(search_api = SearchAPI(query = 'Math'),
>>>                                           response_coordinator = ResponseCoordinator.build())
>>> response = base_search_coordinator.search(page = 1)
>>> response
# OUTPUT <ProcessedResponse(len=20, cache_key=None, metadata="{'numFound': 14618, 'start': 1, ...})>
# All processed records for a particular response can be found under response.data (a list of dictionaries)
>>> list(response.data[0].keys())
# OUTPUT ['article_type', 'eissn', 'id', 'journal', 'publication_date', 'score', 'title_display',
#         'abstract', 'author_display']
__init__(search_api: SearchAPI, response_coordinator: ResponseCoordinator)[source]

Initializes the base coordinator by delegating assignment of attributes to the _initialize method. Future coordinators can follow a similar pattern of using an _initialize for initial parameter assignment.

Parameters:
  • search_api (Optional[SearchAPI]) – The search API to use for the retrieval of response records from APIs

  • response_coordinator (Optional[ResponseCoordinator]) – Core class used to handle the processing and core handling of all responses from APIs

property api: SearchAPI

Alias for the underlying API used for searching.

classmethod as_coordinator(search_api: SearchAPI, response_coordinator: ResponseCoordinator, *args, **kwargs) Self[source]

Helper factory method for building a SearchCoordinator that allows users to build from the final building blocks of a SearchCoordinator.

Parameters:
  • search_API (Optional[SearchAPI]) – The search API to use for the retrieval of response records from APIs

  • response_coordinator (Optional[ResponseCoordinator]) – Core class used to handle the processing and core handling of all responses from APIs

Returns:

A newly created coordinator subclassed from a BaseCoordinator that also orchestrates record retrieval and processing

Return type:

BaseCoordinator

property extractor: BaseDataExtractor

Allows direct access to the DataExtractor from the ResponseCoordinator.

property parser: BaseDataParser

Allows direct access to the data parser from the ResponseCoordinator.

property processor: ABCDataProcessor

Allows direct access to the DataProcessor from the ResponseCoordinator.

property response_coordinator: ResponseCoordinator

Allows the ResponseCoordinator to be used as a property.

The response_coordinator handles and coordinates the processing of API responses from parsing, record/metadata extraction, processing, and cache management.

property responses: ResponseCoordinator

An alias for the response_coordinator property that is used for orchestrating the processing of retrieved API responses.

Handles response orchestration, including response content parsing, the extraction of records/metadata, record processing, and cache operations.

search(**kwargs) ProcessedResponse | ErrorResponse | None[source]

Public Search Method coordinating the retrieval and processing of an API response.

This method serves as the base and will primarily handle the “How” of searching (e.g. Workflows, Single page search, etc.)

property search_api: SearchAPI

Allows the search_api to be used as a property while also allowing for verification.

structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method for quickly showing a representation of the overall structure of the SearchCoordinator. The helper function, generate_repr_from_string helps produce human-readable representations of the core structure of the Coordinator.

Parameters:
  • flatten (bool) – Whether to flatten the coordinator’s structural representation into a single line. Default=False

  • show_value_attributes (bool) – Whether to show nested attributes of the components of the BaseCoordinator its subclass.

Returns:

The structure of the current SearchCoordinator as a string.

Return type:

str

summary() str[source]

Helper method for showing the structure of the current search coordinator.

class scholar_flux.api.ErrorResponse(*, cache_key: str | None = None, response: Any | None = None, created_at: str | None = None, message: str | None = None, error: str | None = None)[source]

Bases: APIResponse

Returned when something goes wrong, but we don’t want to throw immediately—just hand back failure details.

The class is formatted for compatibility with the ProcessedResponse,

property data: None

Provided for type hinting + compatibility.

error: str | None
property extracted_records: None

Provided for type hinting + compatibility.

classmethod from_error(message: str, error: Exception, cache_key: str | None = None, response: Response | ResponseProtocol | None = None) Self[source]

Creates and logs the processing error if one occurs during response processing.

Parameters:
  • response (Response) – Raw API response.

  • cache_key (Optional[str]) – Cache key for storing results.

Returns:

A Dataclass Object that contains the error response data

and background information on what precipitated the error.

Return type:

ErrorResponse

message: str | None
property metadata: None

Provided for type hinting + compatibility.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property parsed_response: None

Provided for type hinting + compatibility.

property processed_records: None

Provided for type hinting + compatibility.

class scholar_flux.api.MultiSearchCoordinator(*args, **kwargs)[source]

Bases: UserDict

The MultiSearchCoordinator is a utility class for orchestrating searches across multiple providers, pages, and queries sequentially or using multithreading. This coordinator builds on the SearchCoordinator’s core structure to ensure consistent, rate-limited API requests.

The multi-search coordinator uses shared rate limiters to ensure that requests to the same provider (even across different queries) will use the same rate limiter.

This implementation uses the ThreadedRateLimiter.min_interval parameter from the shared rate limiter of each provider to determine the request_delay across all queries. These settings can be found and modified in the scholar_flux.api.providers.threaded_rate_limiter_registry by provider_name.

For new, unregistered providers, users can override the MultiSearchCoordinator.DEFAULT_THREADED_REQUEST_DELAY class variable to adjust the shared request_delay.

# Examples:

>>> from scholar_flux import MultiSearchCoordinator, SearchCoordinator, RecursiveDataProcessor
>>> from scholar_flux.api.rate_limiting import threaded_rate_limiter_registry
>>> multi_search_coordinator = MultiSearchCoordinator()
>>> threaded_rate_limiter_registry['arxiv'].min_interval = 6 # arbitrary rate limit (seconds per request)
>>>
>>> # Create coordinators for different queries and providers
>>> coordinators = [
...     SearchCoordinator(
...         provider_name=provider,
...         query=query,
...         processor=RecursiveDataProcessor(),
...         user_agent="SammieH",
...         cache_requests=True
...     )
...     for query in ('ml', 'nlp')
...     for provider in ('plos', 'arxiv', 'openalex', 'crossref')
... ]
>>>
>>> # Add coordinators to the multi-search coordinator
>>> multi_search_coordinator.add_coordinators(coordinators)
>>>
>>> # Execute searches across multiple pages
>>> all_pages = multi_search_coordinator.search_pages(pages=[1, 2, 3])
>>>
>>> # filters and retains successful requests from the multi-provider search
>>> filtered_pages = all_pages.filter()
>>> # The results will contain successfully processed responses across all queries, pages, and providers
>>> print(filtered_pages)  # Output will be a list of SearchResult objects
>>> # Extracts successfully processed records into a list of records where each record is a dictionary
>>> record_dict = filtered_pages.join() # retrieves a list of records
>>> print(record_dict)  # Output will be a flattened list of all records
DEFAULT_THREADED_REQUEST_DELAY: float | int = 6.0
__init__(*args, **kwargs)[source]

Initializes the MultiSearchCoordinator, allowing positional and keyword arguments to be specified when creating the MultiSearchCoordinator.

The initialization of the MultiSearchCoordinator operates similarly to that of a regular dict with the caveat that values are statically typed as SearchCoordinator instances.

add(search_coordinator: SearchCoordinator)[source]

Adds a new SearchCoordinator to the MultiSearchCoordinator instance.

Parameters:

search_coordinator (SearchCoordinator) – A search coordinator to add to the MultiSearchCoordinator dict

Raises: InvalidCoordinatorParameterException: If the expected type is not a SearchCoordinator

add_coordinators(search_coordinators: Iterable[SearchCoordinator])[source]

Helper method for adding a sequence of coordinators at a time.

property coordinators: list[SearchCoordinator]

Utility property for quickly retrieving a list of all currently registered coordinators.

current_providers() set[str][source]

Extracts a set of names corresponding to the each API provider assigned to the MultiSearchCoordinator.

group_by_provider() dict[str, dict[str, SearchCoordinator]][source]

Groups all coordinators by provider name to facilitate retrieval with normalized components where needed. Especially helpful in the latter retrieval of articles when using multithreading by provider (as opposed to by page) to account for strict rate limits. All coordinated searches corresponding to a provider would appear under a nested dictionary to facilitate orchestration on the same thread with the same rate limiter.

Returns:

All elements in the final dictionary map provider-specific coordinators to the normalized provider name for the nested dictionary of coordinators.

Return type:

dict[str, dict[str, SearchCoordinator]]

iter_pages(pages: Sequence[int] | PageListInput, iterate_by_group: bool = False, **kwargs) Generator[SearchResult, None, None][source]

Helper method that creates and joins a sequence of generator functions for retrieving and processing records from each combination of queries, pages, and providers in sequence. This implementation uses the SearchCoordinator.iter_pages to dynamically identify when page retrieval should halt for each API provider, accounting for errors, timeouts, and less than the expected amount of records before filtering records with pre- specified criteria.

Parameters:
  • pages (Sequence[int]) – A sequence of page numbers to iteratively request from the API Provider.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage.

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage.

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

Yields:

SearchResult

Iteratively returns the SearchResult for each provider, query, and page using a generator

expression. Each result contains the requested page number (page), the name of the provider (provider_name), and the result of the search containing a ProcessedResponse, an ErrorResponse, or None (api response)

iter_pages_threaded(pages: Sequence[int] | PageListInput, max_workers: int | None = None, **kwargs) Generator[SearchResult, None, None][source]

Threading by provider to respect rate limits Helper method that implements threading to simultaneously retrieve a sequence of generator functions for retrieving and processing records from each combination of queries, pages, and providers in a multi-threaded set of sequences grouped by provider.

This implementation also uses the SearchCoordinator.iter_pages to dynamically identify when page retrieval should halt for each API provider, accounting for errors, timeouts, and less than the expected amount of records before filtering records with pre-specified criteria.

Note, that as threading is performed by provider, this method will not differ significantly in speed from the MultiSearchCoordinator.iter_pages method if only a single provider has been specified.

Parameters:
  • pages (Sequence[int] | PageListInput) – A sequence of page numbers to request from the API Provider.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage.

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage.

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

Yields:

SearchResult

Iteratively returns the SearchResult for each provider, query, and page using a generator

expression as each SearchResult becomes available after multi-threaded processing. Each result contains the requested page number (page), the name of the provider (provider_name), and the result of the search containing a ProcessedResponse, an ErrorResponse, or None (api response)

search(page: int = 1, iterate_by_group: bool = False, max_workers: int | None = None, multithreading: bool = True, **kwargs) SearchResultList[source]

Public method used to search for a single or multiple pages from multiple providers at once using a sequential or multithreading approach. This approach delegates the search to search_pages to retrieve a single page for query and provider using an iterative approach to search for articles grouped by provider.

Note that the MultiSearchCoordinator.search_pages method uses shared rate limiters to ensure that APIs are not overwhelmed by the number of requests being sent within a specific time interval.

Parameters:
  • pages (Sequence[int]) – A sequence of page numbers to iteratively request from the API Provider.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage.

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage.

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

Returns:

The list containing all retrieved and processed pages from the API. If any non-stopping

errors occur, this will return an ErrorResponse instead with error and message attributes further explaining any issues that occurred during processing.

Return type:

SearchResultList

search_pages(pages: Sequence[int] | PageListInput, iterate_by_group: bool = False, max_workers: int | None = None, multithreading: bool = True, **kwargs) SearchResultList[source]

Public method used to search articles from multiple providers at once using a sequential or multithreading approach. This approach uses iter_pages under the.

Note that the MultiSearchCoordinator.search_pages method uses shared rate limiters to ensure that APIs are not overwhelmed by the number of requests being sent within a specific time interval.

Parameters:
  • pages (Sequence[int]) – A sequence of page numbers to iteratively request from the API Provider.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage.

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage.

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

Returns:

The list containing all retrieved and processed pages from the API. If any non-stopping

errors occur, this will return an ErrorResponse instead with error and message attributes further explaining any issues that occurred during processing.

Return type:

SearchResultList

structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method that shows the current structure of the MultiSearchCoordinator.

class scholar_flux.api.NonResponse(*, cache_key: str | None = None, response: None = None, created_at: str | None = None, message: str | None = None, error: str | None = None)[source]

Bases: ErrorResponse

Response class used to indicate that an error occurred in the preparation of a request or in the retrieval of a response object from an API.

This class is used to signify the error that occurred within the search process using a similar interface as the other scholar_flux Response dataclasses.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

response: None
class scholar_flux.api.ProcessedResponse(*, cache_key: str | None = None, response: Any | None = None, created_at: str | None = None, parsed_response: Any | None = None, extracted_records: List[Any] | None = None, processed_records: List[Dict[Any, Any]] | None = None, metadata: Any | None = None, message: str | None = None)[source]

Bases: APIResponse

Helper class for returning a ProcessedResponse object that contains information on the original, cached, or reconstructed_response received and processed after retrieval from an API in addition to the cache key. This object also allows storage of intermediate steps including:

1) parsed responses 2) extracted records and metadata 3) processed records (aliased as data) 4) any additional messages An error field is provided for compatibility with the ErrorResponse class.

property data: List[Dict[Any, Any]] | None

Alias to the processed_records attribute that holds a list of dictionaries, when available.

property error: None

Provided for type hinting + compatibility.

extracted_records: List[Any] | None
message: str | None
metadata: Any | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

parsed_response: Any | None
processed_records: List[Dict[Any, Any]] | None
class scholar_flux.api.ProviderConfig(*, provider_name: Annotated[str, MinLen(min_length=1)], base_url: str, parameter_map: BaseAPIParameterMap, records_per_page: Annotated[int, Ge(ge=0), Le(le=1000)] = 20, request_delay: Annotated[float, Ge(ge=0)] = 6.1, api_key_env_var: str | None = None, docs_url: str | None = None)[source]

Bases: BaseModel

Config for creating the basic instructions and settings necessary to interact with new providers. This config on initialization is created for default providers on package initialization in the scholar_flux.api.providers submodule. A new, custom provider or override can be added to the provider_registry (A custom user dictionary) from the scholar_flux.api.providers module.

Parameters:
  • provider_name (str) – The name of the provider to be associated with the config.

  • base_url (str) – The URL of the provider to send requests with the specified parameters.

  • parameter_map (BaseAPIParameterMap) – The parameter map indicating the specific semantics of the API.

  • records_per_page (int) – Generally the upper limit (for some APIs) or reasonable limit for the number of retrieved records per request (specific to the API provider).

  • request_delay (float) – Indicates exactly how many seconds to wait before sending successive requests Note that the requested interval may vary based on the API provider.

  • api_key_env_var (Optional[str]) – Indicates the environment variable to look for if the API requires or accepts API keys.

  • docs_url – (Optional[str]): An optional URL that indicates where documentation related to the use of the API can be found.

Example Usage:
>>> from scholar_flux.api import ProviderConfig, APIParameterMap, SearchAPI
>>> # Maps each of the individual parameters required to interact with the Guardian API
>>> parameters = APIParameterMap(query='q',
>>>                              start='page',
>>>                              records_per_page='page-size',
>>>                              api_key_parameter='api-key',
>>>                              auto_calculate_page=False,
>>>                              api_key_required=True)
>>> # creating the config object that holds the basic configuration necessary to interact with the API
>>> guardian_config = ProviderConfig(provider_name = 'GUARDIAN',
>>>                                  parameter_map = parameters,
>>>                                  base_url = 'https://content.guardianapis.com//search',
>>>                                  records_per_page=10,
>>>                                  api_key_env_var='GUARDIAN_API_KEY',
>>>                                  request_delay=6)
>>> api = SearchAPI.from_provider_config(query = 'economic welfare',
>>>                                      provider_config = guardian_config,
>>>                                      use_cache = True)
>>> assert api.provider_name == 'guardian'
>>> response = api.search(page = 1) # assumes that you have the GUARDIAN_API_KEY stored as an env variable
>>> assert response.ok
api_key_env_var: str | None
base_url: str
docs_url: str | None
model_config: ClassVar[ConfigDict] = {'str_strip_whitespace': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod normalize_provider_name(v: str) str[source]

Helper method for normalizing the names of providers to a consistent structure.

parameter_map: BaseAPIParameterMap
provider_name: str
records_per_page: int
request_delay: float
search_config_defaults() dict[str, Any][source]

Convenience Method for retrieving ProviderConfig fields as a dict. Useful for providing the missing information needed to create a SearchAPIConfig object for a provider when only the provider_name has been provided.

Returns:

A dictionary containing the URL, name, records_per_page, and request_delay

for the current provider.

Return type:

(dict)

structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method that shows the current structure of the ProviderConfig.

classmethod validate_base_url(v: str) str[source]

Validates the current url and raises an APIParameterException if invalid.

classmethod validate_docs_url(v: str | None) str | None[source]

Validates the documentation url and raises an APIParameterException if invalid.

class scholar_flux.api.ProviderRegistry(dict=None, /, **kwargs)[source]

Bases: BaseProviderDict

The ProviderRegistry implementation allows the smooth and efficient retrieval of API parameter maps and default configuration settings to aid in the creation of a SearchAPI that is specific to the current API.

Note that the ProviderRegistry uses the ProviderConfig._normalize_name to ignore underscores and case-sensitivity.

- ProviderRegistry.from_defaults

Dynamically imports configurations stored within scholar_flux.api.providers, and fails gracefully if a provider’s module does not contain a ProviderConfig.

- ProviderRegistry.get

resolves a provider name to its ProviderConfig if it exists in the registry.

- ProviderRegistry.get_from_url

resolves a provider URL to its ProviderConfig if it exists in the registry.

add(provider_config: ProviderConfig) None[source]

Helper method for adding a new provider to the provider registry.

create(provider_name: str, **kwargs) ProviderConfig[source]

Helper method that creates and registers a new ProviderConfig with the current provider registry.

Parameters:
  • key (str) – The name of the provider to create a new provider_config for.

  • **kwargs – Additional keyword arguments to pass to scholar_flux.api.models.ProviderConfig

classmethod from_defaults() ProviderRegistry[source]

Helper method that dynamically loads providers from the scholar_flux.api.providers module specifically reserved for default provider configs.

Returns:

A new registry containing the loaded default provider configurations

Return type:

ProviderRegistry

get_from_url(provider_url: str | None) ProviderConfig | None[source]

Attempt to retrieve a ProviderConfig instance for the given provider by resolving the provided url to the provider’s. Will not throw an error in the event that the provider does not exist.

Parameters:

provider_url (Optional[str]) – Name of the default provider

Returns:

Instance configuration for the provider if it exists, else None

Return type:

Optional[ProviderConfig]

remove(provider_name: str) None[source]

Helper method for removing a provider configuration from the provider registry.

class scholar_flux.api.RateLimiter(min_interval: int | float | None = None)[source]

Bases: object

A basic rate limiter used to ensure that function calls (such as API requests) do not exceed a specified rate.

The RateLimiter is used within ScholarFlux to throttle the total number of requests that can be made within a defined time interval (measured in seconds).

This class ensures that calls to RateLimiter.wait() (or any decorated function) are spaced by at least min_interval seconds.

For multithreading applications, the RateLimiter is not thread-safe. Instead, the ThreadedRateLimiter subclass can provide a thread-safe implementation when required.

Parameters:

min_interval (Optional[float | int]) – The minimum number of seconds that must elapse before another request sent or call is performed. If min_interval is not specified, then class attribute, RateLimiter.DEFAULT_MIN_INTERVAL will be assigned to RateLimiter.min_interval instead.

Examples

>>> import requests
>>> from scholar_flux.api import RateLimiter
>>> rate_limiter = RateLimiter(min_interval = 5)
>>> # The first call won't sleep, because a prior call using the rate limiter doesn't yet exist
>>> with rate_limiter:
...     response = requests.get("http://httpbin.org/get")
>>> # will sleep if 5 seconds since the last call hasn't elapsed.
>>> with rate_limiter:
...     response = requests.get("http://httpbin.org/get")
>>> # Or simply call the `wait` method directly:
>>> rate_limiter.wait()
>>> response = requests.get("http://httpbin.org/get")
DEFAULT_MIN_INTERVAL: float | int = 6.1
__init__(min_interval: int | float | None = None)[source]

Initializes the rate limiter with the min_interval argument.

Parameters:

min_interval (Optional[float | int]) – Minimum number of seconds to wait before the next call is performed or request sent.

property min_interval: float | int

The minimum number of seconds that must elapse before another request sent or action is taken.

rate(min_interval: float | int) Iterator[Self][source]

Temporarily adjusts the minimum interval between function calls or requests when used with a context manager.

After the context manager exits, the original minimum interval value is then reassigned its previous value, and the time of the last call is recorded.

Parameters:

min_interval – Indicates the minimum interval to be temporarily used during the call

Yields:

RateLimiter – The original rate limiter with a temporarily changed minimum interval

wait(min_interval: int | float | None = None) None[source]

Block (time.sleep) until at least min_interval has passed since last call.

This method can be used with the min_interval attribute to determine when a search was last sent and throttle requests to make sure rate limits aren’t exceeded. If not enough time has passed, the API will wait before sending the next request.

Parameters:

min_interval (Optional[float | int] = None) – The minimum time to wait until another call is sent. Note that the min_interval attribute or argument must be non-null, otherwise, the default min_interval value is used.

Exceptions:

APIParameterException: Occurs if the value provided is either not an integer/float or is less than 0

class scholar_flux.api.ReconstructedResponse(status_code: int, reason: str, headers: MutableMapping[str, str], content: bytes, url: Any)[source]

Bases: object

Helper class for retaining the most relevant of fields when reconstructing responses from different sources such as requests and httpx (if chosen). The primary purpose of the ReconstructedResponse in scholar_flux is to create a minimal representation of a response when we need to construct a ProcessedResponse without an actual response and verify content fields.

In applications such as retrieving cached data from a scholar_flux.data_storage.DataCacheManager, if an original or cached response is not available, then a ReconstructedResponse is created from the cached response fields when available.

Parameters:
  • status_code (int) – The integer code indicating the status of the response

  • reason (str) – Indicates the reasoning associated with the status of the response

  • MutableMapping[str (headers) – Indicates metadata associated with the response (e.g. Content-Type, etc.)

  • str] – Indicates metadata associated with the response (e.g. Content-Type, etc.)

  • content (bytes) – The content within the response

  • url – (Any): The URL from which the response was received

Note

The ReconstructedResponse.build factory method is recommended in cases when one property may contain the needed fields but may need to be processed and prepared first before being used. Examples include instances where one has text or json data instead of content, a reason_phrase field instead of reason, etc.

Example

>>> from scholar_flux.api.models import ReconstructedResponse
# build a response using a factory method that infers fields from existing ones when not directly specified
>>> response = ReconstructedResponse.build(status_code = 200, content = b"success", url = "https://google.com")
# check whether the current class follows a ResponseProtocol and contains valid fields
>>> assert response.is_response()
# OUTPUT: True
>>> response.validate() # raises an error if invalid
>>> response.raise_for_status() # no error for 200 status codes
>>> assert response.reason == 'OK' == response.status  # inferred from the status_code attribute
__init__(status_code: int, reason: str, headers: MutableMapping[str, str], content: bytes, url: Any) None
asdict() dict[str, Any][source]

Helper method for converting the ReconstructedResponse into a dictionary containing attributes and their corresponding values.

classmethod build(response: Any | None = None, **kwargs) ReconstructedResponse[source]

Helper method for building a new ReconstructedResponse from a regular response object. This classmethod can either construct a new ReconstructedResponse object from a response object or response-like object or create a new ReconstructedResponse altogether with its inputs.

Parameters:

response – (Optional[Any]): A response or response-like object of unknown type or None

kwargs: The underlying components needed to construct a new response. Note that ideally,

this set of key-value pairs would be specific only to the types expected by the ReconstructedResponse.

content: bytes
classmethod fields() list[source]

Helper method for retrieving a list containing the names of all fields associated with the ReconstructedResponse class.

Returns:

A list containing the name of each attribute in the ReconstructedResponse.

Return type:

list[str]

classmethod from_keywords(**kwargs) ReconstructedResponse[source]

Uses the provided keyword arguments to create a ReconstructedResponse. keywords include the default attributes of the ReconstructedResponse, or can be inferred and processed from other keywords.

Parameters:
  • status_code (int) – The integer code indicating the status of the response

  • reason (str) – Indicates the reasoning associated with the status of the response

  • headers (MutableMapping[str, str]) – Indicates metadata associated with the response (e.g. Content-Type)

  • content (bytes) – The content within the response

  • url – (Any): The URL from which the response was received

Some fields can be both provided directly or inferred from other similarly common fields:

  • content: [‘content’, ‘_content’, ‘text’, ‘json’]

  • headers: [‘headers’, ‘_headers’]

  • reason: [‘reason’, ‘status’, ‘reason_phrase’, ‘status_code’]

Returns:

A newly reconstructed response from the given keyword components

Return type:

ReconstructedResponse

headers: MutableMapping[str, str]
is_response() bool[source]

Method for directly validating the fields that indicate that a response has been minimally recreated successfully. The fields that are validated include:

  1. status codes (should be an integer)

  2. URLs (should be a valid url)

  3. reasons (should originate from a reason attribute or inferred from the status code)

  4. content (should be a bytes field or encoded from a string text field)

  5. headers (should be a dictionary with string fields and preferably a content type

Returns:

Indicates whether the current reconstructed response minimally recreates a response object.

Return type:

bool

json() Dict[str, Any] | List[Any] | None[source]

Return JSON-decoded body from the underlying response, if available.

property ok: bool

Indicates whether the current response indicates a successful request (200 <= status_code < 400) or whether an invalid response has been received. Accounts for the.

Returns:

True if the status code is an integer value within the range of 200 and 399, False otherwise

Return type:

bool

raise_for_status() None[source]

Method that imitates the capability of the requests and httpx response types to raise errors when encountering status codes that are indicative of failed responses.

As scholar_flux processes data that is generally only sent when status codes are within the 200s (or exactly 200 [ok]), an error is raised when encountering a value outside of this range.

Raises:
reason: str
property status: str | None

Helper property for retrieving a human-readable status description of the status.

Returns:

The status description associated with the response (if available)

Return type:

Optional[int]

status_code: int
property text: str | None

Helper property for retrieving the text from the bytes content as a string.

Returns:

The decoded text from the content of the response

Return type:

Optional[str]

url: Any
validate() None[source]

Raises an error if the recreated response object does not contain valid properties expected of a response. if the response validation is successful, a response is not raised and an object is not returned.

Raises:

InvalidResponseReconstructionException – if at least one field is determined to be invalid and unexpected of a true response object.

class scholar_flux.api.ResponseCoordinator(parser: BaseDataParser, extractor: BaseDataExtractor, processor: ABCDataProcessor, cache_manager: DataCacheManager)[source]

Bases: object

Coordinates the parsing, extraction, processing, and caching of API responses. The ResponseCoordinator operates on the concept of dependency injection to orchestrate the entire process. Because the structure of the coordinator (parser, extractor, processor)

Note that the overall composition of the coordinator is a governing factor in how the response is processed. The ResponseCoordinator uses a cache key and schema fingerprint to ensure that it is only returning a processed response from the cache storage if the structure of the coordinator at the time of cache storage has not changed.

To ensure that we’re not pulling from cache on significant changes to the ResponseCoordinator, we validate the schema by default using DEFAULT_VALIDATE_FINGERPRINT. When the schema changes, previously cached data is ignored, although this can be explicitly overridden during response handling.

The coordinator orchestration process operates mainly through the ResponseCoordinator.handle_response method that sequentially calls the parser, extractor, processor, and cache_manager.

Example workflow:

>>> from scholar_flux.api import SearchAPI, ResponseCoordinator
>>> api = SearchAPI(query = 'technological innovation', provider_name = 'crossref', user_agent = 'scholar_flux')
>>> response_coordinator = ResponseCoordinator.build() # uses defaults with caching in-memory
>>> response = api.search(page = 1)
# future calls with the same structure will be cached
>>> processed_response = response_coordinator.handle_response(response, cache_key='tech-innovation-cache-key-page-1')
# the ProcessedResponse (or ErrorResponse) stores critical fields from the original and processed response
>>> processed_response
# OUTPUT: ProcessedResponse(len=20, cache_key='tech-innovation-cache-key-page-1', metadata=...)
>>> new_processed_response = response_coordinator.handle_response(processed_response, cache_key='tech-innovation-cache-key-page-1')
>>> new_processed_response
# OUTPUT: ProcessedResponse(len=20, cache_key='tech-innovation-cache-key-page-1', metadata=...)

Note that the entire process can be orchestrated via the SearchCoordinator that uses the SearchAPI and ResponseCoordinator as core dependency injected components:

>>> from scholar_flux import SearchCoordinator
>>> search_coordinator = SearchCoordinator(api, response_coordinator, cache_requests=True)
# uses a default cache key constructed from the response internally
>>> processed_response = search_coordinator.search(page = 1)
# OUTPUT: ProcessedResponse(len=20, cache_key='crossref_technological innovation_1_20', metadata=...)
>>> processed_response.content == new_processed_response.content
Parameters:
DEFAULT_VALIDATE_FINGERPRINT: bool = True
__init__(parser: BaseDataParser, extractor: BaseDataExtractor, processor: ABCDataProcessor, cache_manager: DataCacheManager)[source]

Initializes the response coordinator using the core components used to parse, process, and cache response data.

classmethod build(parser: BaseDataParser | None = None, extractor: BaseDataExtractor | None = None, processor: ABCDataProcessor | None = None, cache_manager: DataCacheManager | None = None, cache_results: bool | None = None) ResponseCoordinator[source]

Factory method to build a ResponseCoordinator with sensible defaults.

Parameters:
  • parser – Optional([BaseDataParser]): First step of the response processing pipeline - parses response records into a dictionary

  • extractor – (Optional[BaseDataExtractor]): Extracts both records and metadata from responses separately

  • processor – (Optional[ABCDataProcessor]): Processes API responses into list of dictionaries

  • cache_manager – (Optional[DataCacheManager]): Manages the caching of processed records for faster retrieval

  • cache_requests – (Optional[bool]): Determines whether or not to cache requests - api is the ground truth if not directly specified

  • cache_results – (Optional[bool]): Determines whether or not to cache processed responses - on by default unless specified or if a cache manager is already provided

Returns:

A fully constructed coordinator.

Return type:

ResponseCoordinator

property cache: DataCacheManager

Alias for the response data processing cache manager:

Also allows direct access to the DataCacheManager from the ResponseCoordinator

property cache_manager: DataCacheManager

Allows direct access to the DataCacheManager from the ResponseCoordinator.

classmethod configure_cache(cache_manager: DataCacheManager | None = None, cache_results: bool | None = None) DataCacheManager[source]

Helper method for building and swapping out cache managers depending on the cache chosen.

Parameters:
  • cache_manager (Optional[DataCacheManager]) – An optional cache manager to use

  • cache_results (Optional[bool]) – Ground truth parameter, used to resolve whether to use caching when the cache_manager and cache_results contradict

Returns:

An existing or newly created cache manager that can be used with the ResponseCoordinator

Return type:

DataCacheManager

property extractor: BaseDataExtractor

Allows direct access to the DataExtractor from the ResponseCoordinator.

handle_response(response: Response | ResponseProtocol, cache_key: str | None = None, from_cache: bool = True, validate_fingerprint: bool | None = None) ErrorResponse | ProcessedResponse[source]

Retrieves the data from the processed response from cache as a if previously cached. Otherwise the data is retrieved after processing the response. The response data is subsequently transformed into a dataclass containing the response content, processing info, and metadata.

Parameters:
  • response (Response) – Raw API response.

  • cache_key (Optional[str]) – Cache key for storing/retrieving.

  • from_cache – (bool): Should we try to retrieve the processed response from the cache?

Returns:

A Dataclass Object that contains response data

and detailed processing info.

Return type:

ProcessedResponse

handle_response_data(response: Response, cache_key: str | None = None) List[Dict[Any, Any]] | List | None[source]

Retrieves the data from the processed response from cache if previously cached. Otherwise the data is retrieved after processing the response.

Parameters:
  • response (Response) – Raw API response.

  • cache_key (Optional[str]) – Cache key for storing/retrieving.

Returns:

Processed response data or None.

Return type:

Optional[List[Dict[Any, Any]]]

property parser: BaseDataParser

Allows direct access to the data parser from the ResponseCoordinator.

property processor: ABCDataProcessor

Allows direct access to the DataProcessor from the ResponseCoordinator.

schema_fingerprint() str[source]

Helper method for generating a concise view of the current structure of the response coordinator.

structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method for retrieving a string representation of the overall structure of the current ResponseCoordinator. The helper function, generate_repr_from_string helps produce human-readable representations of the core structure of the ResponseCoordinator.

Parameters:
  • flatten (bool) – Whether to flatten the ResponseCoordinator’s structural representation into a single line.

  • show_value_attributes (bool) – Whether to show nested attributes of the components in the structure of the current ResponseCoordinator instance.

Returns:

The structure of the current ResponseCoordinator as a string.

Return type:

str

summary() str[source]

Helper class for creating a quick summary representation of the structure of the Response Coordinator.

classmethod update(response_coordinator: ResponseCoordinator, parser: BaseDataParser | None = None, extractor: BaseDataExtractor | None = None, processor: ABCDataProcessor | None = None, cache_manager: DataCacheManager | None = None, cache_results: bool | None = None) ResponseCoordinator[source]

Factory method to create a new ResponseCoordinator from an existing configuration.

Parameters:
  • response_coordinator – Optional([ResponseCoordinator]): ResponseCoordinator containing the defaults to swap

  • parser – Optional([BaseDataParser]): First step of the response processing pipeline - parses response records into a dictionary

  • extractor – (Optional[BaseDataExtractor]): Extracts both records and metadata from responses separately

  • processor – (Optional[ABCDataProcessor]): Processes API responses into list of dictionaries

  • cache_manager – (Optional[DataCacheManager]): Manages the caching of processed records for faster retrieval

  • cache_requests – (Optional[bool]): Determines whether or not to cache requests - api is the ground truth if not directly specified

  • cache_results – (Optional[bool]): Determines whether or not to cache processed responses - on by default unless specified or if a cache manager is already provided

Returns:

A fully constructed coordinator.

Return type:

ResponseCoordinator

class scholar_flux.api.ResponseValidator[source]

Bases: object

Helper class that serves as an initial response validation step to ensure that, in custom retry handling, the basic structure of a response can be validated to determine whether or not to retry the response retrieval process.

The ResponseValidator implements class methods that are simple tools that return boolean values (True/False) when response or response-like objects do not contain the required structure and raise errors when encountering non-response objects or when raise_on_error = True otherwise.

Example

>>> from scholar_flux.api import ResponseValidator, ReconstructedResponse
>>> mock_success_response = ReconstructedResponse.build(status_code = 200,
>>>                                                     json = {'response': 'success'},
>>>                                                     url = "https://an-example-url.com",
>>>                                                     headers={'Content-Type': 'application/json'}
>>>                                                     )
>>> ResponseValidator.validate_response(mock_success_response) is True
>>> ResponseValidator.validate_content(mock_success_response) is True
structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method that shows the current structure of the ResponseValidator class in a string format. This method will show the name of the current class along with its attributes (ResponseValidator())

Returns:

A string representation of the current structure of the ResponseValidator

Return type:

str

classmethod validate_content(response: Response | ResponseProtocol, expected_format: str = 'application/json', *, raise_on_error: bool = False) bool[source]

Validates the response content type.

Parameters:
  • response (requests.Response | ResponseProtocol) – The HTTP response or response-like object to check.

  • expected_format (str) – The expected content type substring (e.g., “application/json”).

  • raise_on_error (bool) – If True, raises InvalidResponseException on mismatch.

Returns:

True if the content type matches, False otherwise.

Return type:

bool

Raises:

InvalidResponseException – If the content type does not match and raise_on_error is True.

classmethod validate_response(response: Response | ResponseProtocol, *, raise_on_error: bool = False) bool[source]

Validates HTTP responses by verifying first whether the object is a Response or follows a ResponseProtocol. For valid response or response- like objects, the status code is verified, returning True for 400 and 500 level validation errors and raising an error if raise_on_error is set to True.

Note that a ResponseProtocol duck-types and verifies that each of a minimal set of attributes and/or properties can be found within the current response.

In the scholar_flux retrieval step, this validator verifies that the response received is a valid response.

Parameters:
  • response – (requests.Response | ResponseProtocol): The HTTP response object to validate

  • raise_on_error (bool) – If True, raises InvalidResponseException on error for invalid response status codes

Returns:

True if valid, False otherwise

Raises:
class scholar_flux.api.RetryHandler(max_retries: int = 3, backoff_factor: float = 0.5, max_backoff: int = 120, retry_statuses: set[int] | list[int] | None = None, raise_on_error: bool | None = None)[source]

Bases: object

Core class used for determining whether or not to retry failed requests when rate limiting, backoff factors, and max backoff when enabled.

DEFAULT_RAISE_ON_ERROR = False
DEFAULT_RETRY_STATUSES = {429, 500, 503, 504}
DEFAULT_VALID_STATUSES = {200}
__init__(max_retries: int = 3, backoff_factor: float = 0.5, max_backoff: int = 120, retry_statuses: set[int] | list[int] | None = None, raise_on_error: bool | None = None)[source]

Helper class to send and retry requests of a specific status code. The RetryHandler also dynamically controls the degree of rate limiting that occurs upon observing a rate limiting error status code.

Parameters:
  • max_retries (int) – indicates how many attempts should be performed before halting retries at retrieving a valid response

  • backoff_factor (float) – indicates the factor used to adjust when the next request is should be attempted based on past unsuccessful attempts

  • max_backoff (int) – describes the maximum number of seconds to wait before submitting

  • retry_statuses (Optional[set[int]]) – Indicates the full list of status codes that should be retried if encountered

  • raise_on_error (Optional[bool]) – Flag that indicates whether or not to raise an error upon encountering an invalid status_code or exception

calculate_retry_delay(attempt_count: int, response: Response | ResponseProtocol | None = None) float[source]

Calculate delay for the next retry attempt.

execute_with_retry(request_func: Callable, validator_func: Callable | None = None, *args, **kwargs) Response | ResponseProtocol | None[source]

Sends a request and retries on failure based on predefined criteria and validation function.

Parameters:
  • request_func – The function to send the request.

  • validator_func – A function that takes a response and returns True if valid.

  • *args – Positional arguments for the request function.

  • **kwargs – Arbitrary keyword arguments for the request function.

Returns:

The response received, or None if no valid response was obtained.

Return type:

requests.Response

Raises:
log_retry_attempt(delay: float, status_code: int | None = None) None[source]

Log an attempt to retry a request.

static log_retry_warning(message: str) None[source]

Log a warning when retries are exhausted or an error occurs.

parse_retry_after(retry_after: str) int | float | None[source]

Parse the ‘Retry-After’ header to calculate delay.

Parameters:

retry_after (str) – The value of ‘Retry-After’ header.

Returns:

Delay time in seconds.

Return type:

int

should_retry(response: Response | ResponseProtocol) bool[source]

Determine whether the request should be retried.

class scholar_flux.api.SearchAPI(query: str, provider_name: str | None = None, parameter_config: BaseAPIParameterMap | APIParameterMap | APIParameterConfig | None = None, session: Session | CachedSession | None = None, user_agent: str | None = None, timeout: int | float | None = None, masker: SensitiveDataMasker | None = None, use_cache: bool | None = None, base_url: str | None = None, api_key: SecretStr | str | None = None, records_per_page: int = 20, request_delay: float | None = None, **api_specific_parameters)[source]

Bases: BaseAPI

The core interface that handles the retrieval of JSON, XML, and YAML content from the scholarly API sources offered by several providers such as SpringerNature, PLOS, and PubMed. The SearchAPI is structured to allow flexibility without complexity in initialization. API clients can be either constructed piece-by-piece or with sensible defaults for session-based retrieval, API key management, caching, and configuration options.

This class is integrated into the SearchCoordinator as a core component of a pipeline that further parses the response, extracts records and metadata, and caches the processed records to facilitate downstream tasks such as research, summarization, and data mining.

Examples

>>> from scholar_flux.api import SearchAPI
# creating a basic API that uses the PLOS as the default while caching data in-memory:
>>> api = SearchAPI(query = 'machine learning', provider_name = 'plos', use_cache = True)
# retrieve a basic request:
>>> response_page_1 = api.search(page = 1)
>>> assert response_page_1.ok
>>> response_page_1
# OUTPUT: <Response [200]>
>>> ml_page_1 = response_page_1.json()
# future requests automatically wait until the specified request delay passes to send another request:
>>> response_page_2 = api.search(page = 2)
>>> assert response_page_1.ok
>>> response_page_2
# OUTPUT: <Response [200]
>>> ml_page_2 = response_page_2.json()
DEFAULT_CACHED_SESSION: bool = False
DEFAULT_URL: str = 'https://api.plos.org/search'
__init__(query: str, provider_name: str | None = None, parameter_config: BaseAPIParameterMap | APIParameterMap | APIParameterConfig | None = None, session: Session | CachedSession | None = None, user_agent: str | None = None, timeout: int | float | None = None, masker: SensitiveDataMasker | None = None, use_cache: bool | None = None, base_url: str | None = None, api_key: SecretStr | str | None = None, records_per_page: int = 20, request_delay: float | None = None, **api_specific_parameters)[source]

Initializes the SearchAPI with a query and optional parameters. The absolute bare minimum for interacting with APIs requires a query, base_url, and an APIParameterConfig that associates relevant fields (aka query, records_per_page, etc. with fields that are specific to each API provider.

Parameters:
  • query (str) – The search keyword or query string.

  • provider_name (Optional[str]) – The name of the API provider where requests will be sent. If a provider_name and base_url are both given, the SearchAPIConfig will prioritize base_urls over the provider_name.

  • parameter_config (Optional[BaseAPIParameterMap | APIParameterMap | APIParameterConfig]) – A config that a parameter map attribute under the hood to build the parameters necessary to interact with an API. For convenience, an APIParameterMap can be provided in place of an APIParameterConfig, and the conversion will take place under the hood.

  • session (Optional[requests.Session]) – A pre-configured session or None to create a new session. A new session is created if not specified.

  • user_agent (Optional[str]) – Optional user-agent string for the session.

  • timeout – (Optional[int | float]): Identifies the number of seconds to wait before raising a TimeoutError

  • masker (Optional[str]) – Used for filtering potentially sensitive information from logs (API keys, auth bearers, emails, etc)

  • use_cache (bool) – Indicates whether or not to create a cached session. If a cached session is already specified, this setting will have no effect on the creation of a session.

  • base_url (str) – The base URL for the article API.

  • api_key (Optional[str | SecretStr]) – API key if required.

  • records_per_page (int) – Number of records to fetch per page (1-100).

  • request_delay (Optional[float]) – Minimum delay between requests in seconds. If not specified, the SearchAPI, this setting will use the default request delay defined in the SearchAPIConfig (6.1 seconds) if an override for the current provider does not exist.

  • **api_specific_parameters

    Additional parameter-value pairs to be provided to SearchAPIConfig class. API specific parameters include:

    mailto (Optional[str | SecretStr]): (CROSSREF: an optional contact for feedback on API usage) db: str (PubMed: a database to retrieve data from (example: db=pubmed)

property api_key: SecretStr | None

Retrieves the current value of the API key from the SearchAPIConfig as a SecretStr.

Note that the API key is stored as a secret key when available. The value of the API key can be retrieved by using the api_key.get_secret_value() method.

Returns:

A secret string of the API key if it exists

Return type:

Optional[SecretStr]

property api_specific_parameters: dict

This property pulls additional parameters corresponding to the API from the configuration of the current API instance.

Returns:

A list of all parameters specific to the current API.

Return type:

dict[str, APISpecificParameter]

property base_url: str

Corresponds to the base URL of the current API.

Returns:

The base URL corresponding to the API Provider

build_parameters(page: int, additional_parameters: dict[str, Any] | None = None, **api_specific_parameters) Dict[str, Any][source]

Constructs the request parameters for the API call, using the provided APIParameterConfig and its associated APIParameterMap. This method maps standard fields (query, page, records_per_page, api_key, etc.) to the provider-specific parameter names.

Using additional_parameters, an arbitrary set of parameter key-value can be added to request further customize or override parameter settings to the API. additional_parameters is offered as a convenience method in case an API may use additional arguments or a query requires specific advanced functionality.

Other arguments and mappings can be supplied through **api_specific_parameters to the parameter config, provided that the options or pre-defined mappings exist in the config.

When **api_specific_parameters and additional_parameters conflict, additional_parameters is considered the ground truth. If any remaining parameters are None in the constructed list of parameters, these values will be dropped from the final dictionary.

Parameters:
  • page (int) – The page number to request.

  • Optional[dict] (additional_parameters) – A dictionary of additional overrides that may or may not have been included in the original parameter map of the current API. (Provided for further customization of requests).

  • **api_specific_parameters – Additional parameters to provide to the parameter config: Note that the config will only accept keyword arguments that have been explicitly defined in the parameter map. For all others, they must be added using the additional_parameters parameter.

Returns:

The constructed request parameters.

Return type:

Dict[str, Any]

property cache: BaseCache | None

Retrieves the requests-session cache object if the session object is a CachedSession object.

If a session cache does not exist, this function will return None.

Returns:

The cache object if available, otherwise None.

Return type:

Optional[BaseCache]

property config: SearchAPIConfig

Property method for accessing the config for the SearchAPI.

Returns:

The configuration corresponding to the API Provider

describe() dict[source]

A helper method used that describe accepted configuration for the current provider or user-defined parameter mappings.

Returns:

a dictionary describing valid config fields and provider-specific api parameters for the current provider (if applicable).

Return type:

dict

classmethod from_defaults(query: str, provider_name: str | None, session: Session | None = None, user_agent: Annotated[str | None, 'An optional User-Agent to associate with each search'] = None, use_cache: bool | None = None, timeout: int | float | None = None, masker: SensitiveDataMasker | None = None, rate_limiter: RateLimiter | None = None, **api_specific_parameters) SearchAPI[source]

Factory method to create SearchAPI instances with sensible defaults for known providers.

PLOS is used by default unless the environment variable, SCHOLAR_FLUX_DEFAULT_PROVIDER is set to another provider.

Parameters:
  • query (str) – The search keyword or query string.

  • base_url (str) – The base URL for the article API.

  • records_per_page (int) – Number of records to fetch per page (1-100).

  • request_delay (Optional[float]) – Minimum delay between requests in seconds.

  • api_key (Optional[str | SecretStr]) – API key if required.

  • session (Optional[requests.Session]) – A pre-configured session or None to create a new session.

  • user_agent (Optional[str]) – Optional user-agent string for the session.

  • use_cache (Optional[bool]) – Indicates whether or not to use cache if a cached session doesn’t yet exist.

  • masker (Optional[str]) – Used for filtering potentially sensitive information from logs

  • **api_specific_parameters – Additional api parameter-value pairs and overrides to be provided to SearchAPIConfig class

Returns:

A new SearchAPI instance initialized with the config chosen.

classmethod from_provider_config(query: str, provider_config: ProviderConfig, session: Session | None = None, user_agent: Annotated[str | None, 'An optional User-Agent to associate with each search'] = None, use_cache: bool | None = None, timeout: int | float | None = None, masker: SensitiveDataMasker | None = None, rate_limiter: RateLimiter | None = None, **api_specific_parameters) SearchAPI[source]

Factory method to create a new SearchAPI instance using a ProviderConfig.

This method uses the default settings associated with the provider config to temporarily make the configuration settings globally available when creating the SearchAPIConfig and APIParameterConfig instances from the provider registry.

Parameters:
  • query (str) – The search keyword or query string.

  • provider_config – ProviderConfig,

  • session (Optional[requests.Session]) – A pre-configured session or None to create a new session.

  • user_agent (Optional[str]) – Optional user-agent string for the session.

  • use_cache (Optional[bool]) – Indicates whether or not to use cache if a cached session doesn’t yet exist.

  • timeout – (Optional[int | float]): Identifies the number of seconds to wait before raising a TimeoutError.

  • masker (Optional[str]) – Used for filtering potentially sensitive information from logs

  • **api_specific_parameters – Additional api parameter-value pairs and overrides to be provided to SearchAPIConfig class

Returns:

A new SearchAPI instance initialized with the chosen configuration.

classmethod from_settings(query: str, config: SearchAPIConfig, parameter_config: BaseAPIParameterMap | APIParameterMap | APIParameterConfig | None = None, session: Session | CachedSession | None = None, user_agent: str | None = None, timeout: int | float | None = None, use_cache: bool | None = None, masker=None, rate_limiter: RateLimiter | None = None) SearchAPI[source]

Advanced constructor: instantiate directly from a SearchAPIConfig instance.

Parameters:
  • query (str) – The search keyword or query string.

  • config (SearchAPIConfig) – Indicates the configuration settings to be used when sending requests to APIs

  • parameter_config – (Optional[BaseAPIParameterMap | APIParameterMap | APIParameterConfig]): Maps global scholar_flux parameters to those that are specific to the current API

  • session – (Optional[requests.Session | CachedSession]): An optional session to use for the creation of request sessions

  • timeout – (Optional[int | float]): Identifies the number of seconds to wait before raising a TimeoutError

  • use_cache – Optional[bool]: Indicates whether or not to use cache. The settings from session are otherwise used this option is not specified.

  • masker – (Optional[SensitiveDataMasker]): A masker used to filter logs of API keys and other sensitive data

  • user_agent – Optional[str] = An user agent to associate with the session

Returns:

A newly constructed SearchAPI with the chosen/validated settings

Return type:

SearchAPI

static is_cached_session(session: CachedSession | Session) bool[source]

Checks whether the current session is a cached session.

To do so, this method first determines whether the current object has a ‘cache’ attribute and whether the cache element, if existing, is a BaseCache.

Parameters:

session (requests.Session) – The session to check.

Returns:

True if the session is a cached session, False otherwise.

Return type:

bool

make_request(current_page: int, additional_parameters: dict[str, Any] | None = None, request_delay: float | None = None) Response[source]

Constructs and sends a request to the chosen api:

The parameters are built based on the default/chosen config and parameter map :param page: The page number to request. :type page: int :param additional_parameters Optional[dict]: A dictionary of additional overrides not included in the original SearchAPIConfig :param request_delay: Overrides the configured request delay for the current request only. :type request_delay: Optional[float]

Returns:

The API’s response to the request.

Return type:

requests.Response

property parameter_config: APIParameterConfig

Property method for accessing the parameter mapping config for the SearchAPI.

Returns:

The configuration corresponding to the API Provider

prepare_request(base_url: str | None = None, endpoint: str | None = None, parameters: Dict[str, Any] | None = None, api_key: str | None = None) PreparedRequest[source]

Prepares a GET request for the specified endpoint with optional parameters.

This method builds on the original base class method by additionally allowing users to specify a custom request directly while also accounting for the addition of an API key specific to the API.

Parameters:
  • base_url (str) – The base URL for the API.

  • endpoint (Optional[str]) – The API endpoint to prepare the request for.

  • parameters (Optional[Dict[str, Any]]) – Optional query parameters for the request.

Returns:

The prepared request object.

Return type:

requests.PreparedRequest

Prepares the current request given the provided page and parameters.

The prepared request object can be sent using the SearchAPI.session.send method with requests.Session and `requests_cache.CachedSession`objects.

Parameters:
  • page (Optional[int]) – Page number to query. If provided, parameters are built from the config and this page.

  • parameters (Optional[Dict[str, Any]]) – If provided alone, used as the full parameter set to build the current request. If provided together with page, these act as additional or overriding parameters on top of the built config.

Returns:

A request object that can be sent via api.session.send.

Return type:

requests.PreparedRequest

property provider_name: str

Property method for accessing the provider name in the current SearchAPI instance.

Returns:

The name corresponding to the API Provider.

property query: str

Retrieves the current value of the query to be sent to the current API.

property records_per_page: int

Indicates the total number of records to show on each page.

Returns:

an integer indicating the max number of records per page

Return type:

int

property request_delay: float

Indicates how long we should wait in-between requests.

Helpful for ensuring compliance with the rate-limiting requirements of various APIs.

Returns:

The number of seconds to wait at minimum between each request

Return type:

float

search(page: int | None = None, parameters: Dict[str, Any] | None = None, request_delay: float | None = None) Response[source]

Public method to perform a search for the selected page with the current API configuration.

A search can be performed by specifying either the page to query with the preselected defaults and additional parameter overrides for other parameters accepted by the API.

Users can also create a custom request using a parameter dictionary containing the full set of API parameters.

Parameters:
  • page (Optional[int]) – Page number to query. If provided, parameters are built from the config and this page.

  • parameters (Optional[Dict[str, Any]]) – If provided alone, used as the full parameter set for the request. If provided together with page, these act as additional or overriding parameters on top of the built config.

  • request_delay (Optional[float]) – Overrides the configured request delay for the current request only.

Returns:

A response object from the API containing articles and metadata

Return type:

requests.Response

session: Session
structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method for quickly showing a representation of the overall structure of the SearchAPI. The helper function, generate_repr_from_string helps produce human-readable representations of the core structure of the SearchAPI.

Parameters:
  • flatten (bool) – Whether to flatten the SearchAPI’s structural representation into a single line.

  • show_value_attributes (bool) – Whether to show nested attributes of the components of the SearchAPI.

Returns:

The structure of the current SearchAPI as a string.

Return type:

str

summary() str[source]

Create a summary representation of the current structure of the API.

classmethod update(search_api: SearchAPI, query: str | None = None, config: SearchAPIConfig | None = None, parameter_config: BaseAPIParameterMap | APIParameterMap | APIParameterConfig | None = None, session: Session | CachedSession | None = None, user_agent: str | None = None, timeout: int | float | None = None, use_cache: bool | None = None, masker: SensitiveDataMasker | None = None, rate_limiter: RateLimiter | None = None, **api_specific_parameters)[source]

Helper method for generating a new SearchAPI from an existing SearchAPI instance. All parameters that are not modified are pulled from the original SearchAPI. If no changes are made, an identical SearchAPI is generated from the existing defaults.

Parameters:
  • config (SearchAPIConfig) – Indicates the configuration settings to be used when sending requests to APIs

  • parameter_config (Optional[BaseAPIParameterMap | APIParameterMap | APIParameterConfig]) – Maps global scholar_flux parameters to those that are API specific.

  • session – (Optional[requests.Session | CachedSession]): An optional session to use for the creation of request sessions

  • timeout – (Optional[int | float]): Identifies the number of seconds to wait before raising a TimeoutError

  • use_cache – Optional[bool]: Indicates whether or not to use cache. The settings from session are otherwise used this option is not specified.

  • masker – (Optional[SensitiveDataMasker]): A masker used to filter logs of API keys and other sensitive data

  • user_agent – Optional[str] = An user agent to associate with the session

Returns:

A newly constructed SearchAPI with the chosen/validated settings

Return type:

SearchAPI

with_config(config: SearchAPIConfig | None = None, parameter_config: APIParameterConfig | None = None, provider_name: str | None = None, query: str | None = None) Iterator[SearchAPI][source]

Temporarily modifies the SearchAPI’s SearchAPIConfig and/or APIParameterConfig and namespace. You can provide a config, a parameter_config, or a provider_name to fetch defaults. Explicitly provided configs take precedence over provider_name, and the context manager will revert changes to the parameter mappings and search configuration afterward.

Parameters:
  • config (Optional[SearchAPIConfig]) – Temporary search api configuration to use within the context to control where and how response records are retrieved.

  • parameter_config (Optional[APIParameterConfig]) – Temporary parameter config to use within the context to resolve universal parameters names to those that are specific to the current api.

  • provider_name (Optional[str]) – Used to retrieve the associated configuration for a specific provider in order to edit the parameter map when using a different provider.

  • query (Optional[str]) – Allows users to temporarily modify the query used to retrieve records from an API.

Yields:

SearchAPI – The current api object with a temporarily swapped config during the context manager.

with_config_parameters(provider_name: str | None = None, query: str | None = None, **api_specific_parameters) Iterator[SearchAPI][source]

Allows for the temporary modification of the search configuration, and parameter mappings, and cache namespace. For the current API. Uses a contextmanager to temporarily change the provided parameters without persisting the changes.

Parameters:
  • provider_name (Optional[str]) – If provided, fetches the default parameter config for the provider.

  • query (Optional[str]) – Allows users to temporarily modify the query used to retrieve records from an API.

  • **api_specific_parameters (SearchAPIConfig) – Fields to temporarily override in the current config.

Yields:

SearchAPI – The API object with temporarily swapped config and/or parameter config.

class scholar_flux.api.SearchAPIConfig(*, provider_name: str = '', base_url: str = '', records_per_page: Annotated[int, Ge(ge=0), Le(le=1000)] = 20, request_delay: float = -1, api_key: SecretStr | None = None, api_specific_parameters: dict[str, Any] | None = None)[source]

Bases: BaseModel

The SearchAPIConfig class provides the core tools necessary to set and interact with the API. The SearchAPI uses this class to retrieve data from an API using universal parameters to simplify the process of retrieving raw responses.

provider_name

Indicates the name of the API to use when making requests to a provider. If the provider name matches a known default and the base_url is unspecified, the base URL for the current provider is used instead.

Type:

str

base_url

Indicates the API URL where data will be searched and retrieved.

Type:

str

records_per_page

Controls the number of records that will appear on each page

Type:

int

request_delay

Indicates the minimum delay between each request to avoid exceeding API rate limits

Type:

float

api_key

This is an API-specific parameter for validating the current user’s identity. If a str type is provided, it is converted into a SecretStr.

Type:

Optional[str | SecretStr]

api_specific_parameters

A dictionary containing all parameters specific to the current API. API-specific parameters include the following.

  1. mailto (Optional[str | SecretStr]):

    An optional email address for receiving feedback on usage from providers, This parameter is currently applicable only to the Crossref API.

  2. db: (str):

    The parameter use by the NIH to direct requests for data to the pubmed database. This parameter defaults to pubmed and does not require direct specification

Type:

dict[str, APISpecificParameter]

Examples

>>> from scholar_flux.api import SearchAPIConfig, SearchAPI, provider_registry
# to create a CROSSREF configuration with minimal defaults and provide an api_specific_parameter:
>>> config = SearchAPIConfig.from_defaults(provider_name = 'crossref', mailto = 'your_email_here@example.com')
# the configuration automatically retrieves the configuration for the "Crossref" API
>>> assert config.provider_name == 'crossref' and config.base_url == provider_registry['crossref'].base_url
>>> api = SearchAPI.from_settings(query = 'q', config = config)
>>> assert api.config == config
# to retrieve all defaults associated with a provider and automatically read an API key if needed
>>> config = SearchAPIConfig.from_defaults(provider_name = 'pubmed', api_key = 'your api key goes here')
# the API key is retrieved automatically if you have the API key specified as an environment variable
>>> assert config.api_key is not None
# Default provider API specifications are already pre-populated if they are set with defaults
>>> assert config.api_specific_parameters['db'] == 'pubmed'  # required by pubmed and defaults to pubmed
# Update a provider and automatically retrieve its API key - the previous API key will no longer apply
>>> updated_config = SearchAPIConfig.update(config, provider_name = 'core')
# The API key should have been overwritten to use core. Looks for a `CORE_API_KEY` env variable by default
>>> assert updated_config.provider_name  == 'core' and  updated_config.api_key != config.api_key
DEFAULT_PROVIDER: ClassVar[str] = 'PLOS'
DEFAULT_RECORDS_PER_PAGE: ClassVar[int] = 25
DEFAULT_REQUEST_DELAY: ClassVar[float] = 6.1
MAX_API_KEY_LENGTH: ClassVar[int] = 512
api_key: SecretStr | None
api_specific_parameters: dict[str, Any] | None
base_url: str
classmethod default_request_delay(v: int | float | None, provider_name: str | None = None) float[source]

Helper method enabling the retrieval of the most appropriate rate limit for the current provider.

Defaults to the SearchAPIConfig default rate limit when the current provider is unknown and a valid rate limit has not yet been provided.

Parameters:
  • v (Optional[int | float]) – The value received for the current request_delay

  • provider_name (Optional[str]) – The name of the provider to retrieve a rate limit for

Returns:

The inputted non-negative request delay, the retrieved rate limit for the current provider

if available, or the SearchAPIConfig.DEFAULT_REQUEST_DELAY - all in order of priority.

Return type:

float

classmethod from_defaults(provider_name: str, **overrides) SearchAPIConfig[source]

Uses the default configuration for the chosen provider to create a SearchAPIConfig object containing configuration parameters. Note that additional parameters and field overrides can be added via the **overrides field.

Parameters:
  • provider_name (str) – The name of the provider to create the config

  • **overrides – Optional keyword arguments to specify overrides and additional arguments

Returns:

A default APIConfig object based on the chosen parameters

Return type:

SearchAPIConfig

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

provider_name: str
records_per_page: int
request_delay: float
classmethod set_records_per_page(v: int | None)[source]

Sets the records_per_page parameter with the default if the supplied value is not valid:

Triggers a validation error when request delay is an invalid type. Otherwise uses the DEFAULT_RECORDS_PER_PAGE class attribute if the supplied value is missing or is a negative number.

structure(flatten: bool = False, show_value_attributes: bool = True) str[source]

Helper method for retrieving a string representation of the overall structure of the current SearchAPIConfig.

classmethod update(current_config: SearchAPIConfig, **overrides) SearchAPIConfig[source]

Create a new SearchAPIConfig by updating an existing config with new values and/or switching to a different provider. This method ensures that the new provider’s base_url and defaults are used if provider_name is given, and that API-specific parameters are prioritized and merged as expected.

Parameters:
  • current_config (SearchAPIConfig) – The existing configuration to update.

  • **overrides – Any fields or API-specific parameters to override or add.

Returns:

A new config with the merged and prioritized values.

Return type:

SearchAPIConfig

property url_basename: str

Uses the _extract_url_basename method from the provider URL associated with the current config instance.

classmethod validate_api_key(v: SecretStr | str | None) SecretStr | None[source]

Validates the api_key attribute and triggers a validation error if it is not valid.

classmethod validate_provider_name(v: str | None) str[source]

Validates the provider_name attribute and triggers a validation error if it is not valid.

classmethod validate_request_delay(v: int | float | None) int | float | None[source]

Sets the request delay (delay between each request) for valid request delays. This validator triggers a validation error when the request delay is an invalid type.

If a request delay is left None or is a negative number, this class method returns -1, and further validation is performed by cls.default_request_delay to retrieve the provider’s default request delay.

If not available, SearchAPIConfig.DEFAULT_REQUEST_DELAY is used.

validate_search_api_config_parameters() Self[source]

Validation method that resolves URLs and/or provider names to provider_info when one or the other is not explicitly provided.

Occurs as the last step in the validation process.

classmethod validate_url(v: str)[source]

Validates the base_url and triggers a validation error if it is not valid.

classmethod validate_url_type(v: str | None) str[source]

Validates the type for the base_url attribute and triggers a validation error if it is not valid.

class scholar_flux.api.SearchCoordinator(search_api: SearchAPI | None = None, response_coordinator: ResponseCoordinator | None = None, parser: BaseDataParser | None = None, extractor: BaseDataExtractor | None = None, processor: ABCDataProcessor | None = None, cache_manager: DataCacheManager | None = None, query: str | None = None, provider_name: str | None = None, cache_requests: bool | None = None, cache_results: bool | None = None, retry_handler: RetryHandler | None = None, validator: ResponseValidator | None = None, workflow: SearchWorkflow | None = None, **kwargs)[source]

Bases: BaseCoordinator

High-level coordinator for requesting and retrieving records and metadata from APIs.

This class uses dependency injection to orchestrate the process of constructing requests, validating response, and processing scientific works and articles. This class is designed to abstract away the complexity of using APIs while providing a consistent and robust interface for retrieving record data and metadata from request and storage cache if valid to help avoid exceeding limits in API requests.

If no search_api is provided, the coordinator will create a Search API that uses the default provider if the environment variable, SCHOLAR_FLUX_DEFAULT_PROVIDER, is not provided. Otherwise PLOS is used on the backend.

__init__(search_api: SearchAPI | None = None, response_coordinator: ResponseCoordinator | None = None, parser: BaseDataParser | None = None, extractor: BaseDataExtractor | None = None, processor: ABCDataProcessor | None = None, cache_manager: DataCacheManager | None = None, query: str | None = None, provider_name: str | None = None, cache_requests: bool | None = None, cache_results: bool | None = None, retry_handler: RetryHandler | None = None, validator: ResponseValidator | None = None, workflow: SearchWorkflow | None = None, **kwargs)[source]

Flexible initializer that constructs a SearchCoordinator either from its core components or from their basic building blocks when these core components are not directly provided.

If search_api and response_coordinator are provided, then this method will use these inputs directly.

The additional parameters can still be used to update these two components. For example, a search_api can be updated with a new query, session, and SearchAPIConfig parameters through keyword arguments (**kwargs))

When neither component is provided:
  • The creation of the search_api requires, at minimum, a query.

  • If the response_coordinator, a parser, extractor, processor, and cache_manager aren’t provided, then a new ResponseCoordinator will be built from the default settings.

Core Components/Attributes:
SearchAPI: handles all requests to an API based on its configuration.

Dependencies: query, **kwargs

ResponseCoordinator:handles the parsing, record/metadata extraction, processing, and caching of responses

Dependencies: parser, extractor, processor, cache_manager

Other Attributes:

RetryHandler: Addresses when to retry failed requests and how failed requests are retried SearchWorkflow: An optional workflow that defines custom search logic from specific APIs Validator: handles how requests are validated. The default determines whether a 200 response was received

Note

This implementation uses the underlying private method _initialize to handle the assignment of parameters under the hood while the core function of the __init__ creates these components if they do not already exist.

Parameters:
  • search_api (Optional[SearchAPI]) – The search API to use for the retrieval of response records from APIs

  • response_coordinator (Optional[ResponseCoordinator]) – Core class used to handle the processing and core handling of all responses from APIs

  • parser (Optional(BaseDataParser)) – First step of the response processing pipeline - parses response records into a dictionary

  • extractor (Optional[BaseDataExtractor]) – Extracts both records and metadata from responses separately

  • processor (Optional[ABCDataProcessor]) – Processes the previously extracted API records into list of dictionaries that are filtered and optionally flattened during processing

  • cache_manager (Optional[DataCacheManager]) – Manages the caching of processed records for faster retrieval

  • query (Optional[str]) – Query to be used when sending requests when creating an API - modifies the query if the API already exists

  • provider_name (Optional[str]) – The name of the API provider where requests will be sent. If a provider_name and base_url are both given, the SearchAPIConfig will prioritize base_urls over the provider_name.

  • cache_requests (Optional[bool]) – Determines whether or not to cache requests - api is the ground truth if not directly specified

  • cache_results (Optional[bool]) – Determines whether or not to cache processed responses - on by default unless specified otherwise

  • retry_handler (Optional[RetryHandler]) – class used to retry failed requests-cache

  • validator (Optional[ResponseValidator]) – class used to verify and validate responses returned from APIs

  • workflow (Optional[SearchWorkflow]) – An optional workflow used to customize how records are retrieved from APIs. Uses the default workflow for the current provider when a workflow is not directly specified.

  • **kwargs – Keyword arguments to be passed to the SearchAPIConfig that creates the SearchAPI if it doesn’t already exist

  • Examples

    >>> from scholar_flux import SearchCoordinator
    >>> from scholar_flux.api import APIResponse, ReconstructedResponse
    >>> from scholar_flux.sessions import CachedSessionManager
    >>> from typing import MutableMapping
    >>> session = CachedSessionManager(user_agent = 'scholar_flux', backend='redis').configure_session()
    >>> search_coordinator = SearchCoordinator(query = "Intrinsic Motivation", session = session, cache_results = False)
    >>> response = search_coordinator.search(page = 1)
    >>> response
    # OUTPUT: <ProcessedResponse(len=50, cache_key='plos_Functional Processing_1_50', metadata='...') ': 1, 'maxSco...")>
    >>> new_response = ReconstructedResponse.build(**response.response.__dict__)
    >>> new_response.validate()
    >>> new_response = ReconstructedResponse.build(response.response)
    >>> ReconstructedResponse.build(new_response).validate()
    >>> new_response.validate()
    >>> newer_response = APIResponse.as_reconstructed_response(new_response)
    >>> newer_response.validate()
    >>> double_processed_response = search_coordinator._process_response(response = newer_response, cache_key = response.cache_key)
    

classmethod as_coordinator(search_api: SearchAPI, response_coordinator: ResponseCoordinator, *args, **kwargs) SearchCoordinator[source]

Helper factory method for building a SearchCoordinator that allows users to build from the final building blocks of a SearchCoordinator.

Parameters:
  • search_api (Optional[SearchAPI]) – The search API to use for the retrieval of response records from APIs

  • response_coordinator (Optional[ResponseCoordinator]) – Core class used to handle the processing and core handling of all responses from APIs

Returns:

A newly created coordinator that orchestrates record retrieval and processing

Return type:

SearchCoordinator

fetch(page: int, from_request_cache: bool = True, raise_on_error: bool = False, **api_specific_parameters) Response | ResponseProtocol | None[source]

Fetches the raw response from the current API or from cache if available.

Parameters:
  • page (int) – The page number to retrieve from the cache.

  • from_request_cache (bool) – This parameter determines whether to try to fetch a valid response from cache.

  • **api_specific_parameters (SearchAPIConfig) – Fields to temporarily override when building the request.

Returns:

The response object if available, otherwise None.

Return type:

Optional[Response]

get_cached_request(page: int, **kwargs) Response | ResponseProtocol | None[source]

Retrieves the cached request for a given page number if available.

Parameters:

page (int) – The page number to retrieve from the cache.

Returns:

The cached request object if available, otherwise None.

Return type:

Optional[Response]

get_cached_response(page: int) Dict[str, Any] | None[source]

Retrieves the cached response for a given page number if available.

Parameters:

page (int) – The page number to retrieve from the cache.

Returns:

The cached response data if available, otherwise None.

Return type:

Optional[Dict[str, Any]]

iter_pages(pages: Sequence[int] | PageListInput, from_request_cache: bool = True, from_process_cache: bool = True, use_workflow: bool | None = True, **api_specific_parameters) Generator[SearchResult, None, None][source]

Helper method that creates a generator function for retrieving and processing records from the API Provider for a page range in sequence. This implementation dynamically examines the properties of the page search result for each retrieved API response to determine whether or not iteration should halt early versus determining whether iteration should continue.

This method is directly used by SearchCoordinator.search_pages to provide a clean interface that abstracts the complexity of iterators and is also provided for convenience when iteration is more preferable.

Parameters:
  • pages (Sequence[int] | PageListInput) – A sequence of page numbers to request from the API Provider.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage.

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage.

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

  • **api_specific_parameters (SearchAPIConfig) – Fields to temporarily override when building the request.

Yields:

SearchResult

Iteratively returns the SearchResult for each page using a generator expression.

Each result contains the requested page number (page), the name of the provider (provider_name), and the result of the search containing a ProcessedResponse, an ErrorResponse, or None (api response)

robust_request(page: int, **api_specific_parameters) Response | ResponseProtocol | None[source]

Constructs and sends a request to the current API. Fetches a response from the current API.

Parameters:
  • page (int) – The page number to retrieve from the cache.

  • **kwargs – Optional Additional parameters to pass to the SearchAPI

Returns:

The request object if available, otherwise None.

Return type:

Optional[Response]

search(page: int = 1, from_request_cache: bool = True, from_process_cache: bool = True, use_workflow: bool | None = True, **api_specific_parameters) ProcessedResponse | ErrorResponse | None[source]

Public method for retrieving and processing records from the API specifying the page and records per page. Note that the response object is saved under the last_response attribute in the event that the response is retrieved and processed successfully, irrespective of whether the response was cached.

Parameters:
  • page (int) – The current page number. Used for process caching purposes even if not required by the API

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

  • **api_specific_parameters (SearchAPIConfig) – Fields to temporarily override when building the request.

Returns:

A ProcessedResponse model containing the response (response), processed records (data), and article metadata (metadata) if the response was successful. Otherwise returns an ErrorResponse where the reason behind the error (message), exception type (error), and response (response) are provided. Possible error responses also include a NonResponse (an ErrorResponse subclass) for cases where a response object is irretrievable. Like the ErrorResponse class, NonResponse is also Falsy (i.e., not NonResponse returns True)

Return type:

Optional[ProcessedResponse | ErrorResponse]

search_data(page: int = 1, from_request_cache: bool = True, from_process_cache: bool = True) List[Dict] | None[source]

Public method to perform a search, specifying the page and records per page. Note that instead of returning a ProcessedResponse or ErrorResponse, this calls the search method and retrieves only the list of processed dictionary records from the ProcessedResponse.

Parameters:
  • page (int) – The current page number.

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage stored within the SearchCoordinator.search_api.cache

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the processing cache stored within the SearchCoordinator.response_coordinator.cache

Returns:

A List of records containing processed article data

Return type:

Optional[List[Dict]]

search_pages(pages: Sequence[int] | PageListInput, from_request_cache: bool = True, from_process_cache: bool = True, use_workflow: bool | None = True, **api_specific_parameters) SearchResultList[source]

Public method for retrieving and processing records from the API specifying the page and records per page in sequence. This method Note that the response object is saved under the last_response attribute in the event that the data is processed successfully, irrespective of whether responses are cached or not.

Parameters:
  • page (int) – The current page number. Used for process caching purposes even if not required by the API

  • from_request_cache (bool) – This parameter determines whether to try to retrieve the response from the requests-cache storage

  • from_process_cache (bool) – This parameter determines whether to attempt to pull processed responses from the cache storage

  • use_workflow (bool) – Indicates whether to use a workflow if available Workflows are utilized by default.

  • **api_specific_parameters (SearchAPIConfig) – Fields to temporarily override when building the request.

Returns:

A list of response data classes containing processed article data (data).

Note that processing stops if the response for a given page is None, is not retrievable, or contains less than the expected number of responses, indicating that the next page may contain no more records.

Return type:

List[ProcessedResponse]

classmethod update(search_coordinator: SearchCoordinator, search_api: SearchAPI | None = None, response_coordinator: ResponseCoordinator | None = None, retry_handler: RetryHandler | None = None, validator: ResponseValidator | None = None, workflow: SearchWorkflow | None = None) SearchCoordinator[source]

Helper factory method allowing the creation of a new components based on an existing configuration while allowing the replacement of previous components. Note that this implementation does not directly copy the underlying components if a new component is not selected.

Parameters:
  • SearchCoordinator – A previously created coordinator containing the components to use if a default is not provided

  • search_api (Optional[SearchAPI]) – The search API to use for the retrieval of response records from APIs

  • response_coordinator (Optional[ResponseCoordinator]) – Core class used to handle the processing and core handling of all responses from APIs

  • retry_handler (Optional[RetryHandler]) – class used to retry failed requests-cache

  • validator (Optional[ResponseValidator]) – class used to verify and validate responses returned from APIs

  • workflow (Optional[SearchWorkflow]) – An optional workflow used to customize how records are retrieved from APIs. Uses the default workflow for the current provider when a workflow is not directly specified and does not directly carry over in cases where a new provider is chosen.

Returns:

A newly created coordinator that orchestrates record retrieval and processing

Return type:

SearchCoordinator

class scholar_flux.api.ThreadedRateLimiter(min_interval: int | float | None = None)[source]

Bases: RateLimiter

Thread-safe version of RateLimiter that can be safely used across multiple threads.

Inherits all functionality from RateLimiter but adds thread synchronization to prevent race conditions when multiple threads access the same limiter instance.

__init__(min_interval: int | float | None = None)[source]

Initialize with thread safety.

rate(min_interval: float | int) Iterator[Self][source]

Thread-safe version of rate() context manager.

Parameters:

min_interval – The minimum interval to temporarily use during the call

Yields:

ThreadSafeRateLimiter – The rate limiter with temporarily changed interval

wait(min_interval: int | float | None = None) None[source]

Thread-safe version of wait() that prevents race conditions.

scholar_flux.api.validate_email(email: str) bool[source]

Uses regex to determine whether the provided value is an email.

Parameters:

email (str) – The email string to validate

Returns:

True if the email is valid, and False Otherwise

scholar_flux.api.validate_url(url: str) bool[source]

Uses urlparse to determine whether the provided value is an url.

Parameters:

url (str) – The url string to validate

Returns:

True if the url is valid, and False Otherwise