# /api/search_coordinator.py
"""Implements the SearchCoordinator for orchestrating single/multi-page API response retrieval and record processing."""
from __future__ import annotations
from typing import Any, Optional, Sequence, cast, Generator
from typing_extensions import Self
from requests import PreparedRequest, Response
from pydantic import ValidationError
import logging
from scholar_flux.api.rate_limiting.retry_handler import RetryHandler
from scholar_flux import DataCacheManager
from scholar_flux.api import (
SearchAPI,
ResponseCoordinator,
ResponseValidator,
ProcessedResponse,
APIResponse,
ErrorResponse,
NonResponse,
)
from scholar_flux.api.models import PageListInput, SearchResult, SearchResultList
from scholar_flux.api.models.response_metadata_map import ResponseMetadataMap
from scholar_flux.api.validators import normalize_url, validate_url
from scholar_flux.data.base_parser import BaseDataParser
from scholar_flux.data.base_extractor import BaseDataExtractor
from scholar_flux.data.abc_processor import ABCDataProcessor
from scholar_flux.utils.response_protocol import ResponseProtocol
from scholar_flux.utils.record_types import RecordList
from scholar_flux.utils.helpers import parse_iso_timestamp, try_call
from scholar_flux.api.providers import provider_registry
from scholar_flux.exceptions import (
RequestFailedException,
PageUnavailableFromCacheException,
RetryAfterDelayExceededException,
RequestCacheException,
StorageCacheException,
APIParameterException,
InvalidCoordinatorParameterException,
)
from scholar_flux.api import BaseCoordinator
from scholar_flux.api.workflows import WORKFLOW_DEFAULTS, SearchWorkflow
from time import time
from datetime import datetime
import re
from functools import partial
logger = logging.getLogger(__name__)
[docs]
class SearchCoordinator(BaseCoordinator):
"""High-level coordinator for requesting and retrieving records and metadata from APIs.
This class uses dependency injection to orchestrate the process of constructing requests,
validating responses, and processing scientific works and articles. This class is designed
to abstract away the complexity of using APIs while providing a consistent and
robust interface for retrieving record data and metadata from request and storage cache
if valid to help avoid exceeding limits in API requests.
If no search_api is provided, the coordinator will create a Search API that uses the default
provider if the environment variable, `SCHOLAR_FLUX_DEFAULT_PROVIDER`, is not provided.
Otherwise PLOS is used on the backend.
"""
[docs]
def __init__(
self,
search_api: Optional[SearchAPI] = None,
response_coordinator: Optional[ResponseCoordinator] = None,
*,
parser: Optional[BaseDataParser] = None,
extractor: Optional[BaseDataExtractor] = None,
processor: Optional[ABCDataProcessor] = None,
cache_manager: Optional[DataCacheManager] = None,
query: Optional[str] = None,
provider_name: Optional[str] = None,
cache_requests: Optional[bool] = None,
cache_results: Optional[bool] = None,
annotate_records: Optional[bool] = None,
retry_handler: Optional[RetryHandler] = None,
validator: Optional[ResponseValidator] = None,
workflow: Optional[SearchWorkflow] = None,
**kwargs: Any,
) -> None:
"""Flexible initializer that constructs a `SearchCoordinator` from its core components or their building blocks.
If `SearchAPI` and `ResponseCoordinator` are provided, then this method will use these inputs directly.
Otherwise, the coordinator will be created from their underlying dependencies when these core components are not
directly provided.
The additional parameters can still be used to update these two components. For example, a `search_api` can be
updated with a new `query`, `session`, and SearchAPIConfig parameters through keyword arguments (`**kwargs`).
When neither component is provided:
- The creation of the search_api requires, at minimum, a query.
- If the response_coordinator, a parser, extractor, processor, and cache_manager aren't provided,
then a new ResponseCoordinator will be built from the default settings.
Core Components/Attributes:
SearchAPI: handles all requests to an API based on its configuration.
Dependencies: `query`, `**kwargs`
ResponseCoordinator: handles the parsing, record/metadata extraction, processing, and caching of responses
Dependencies: `parser`, `extractor`, `processor`, `cache_manager`
Other Attributes:
RetryHandler: Addresses when to retry failed requests and how failed requests are retried
SearchWorkflow: An optional workflow that defines custom search logic from specific APIs
Validator: handles how requests are validated. The default determines whether a 200 response was received
Note:
This implementation uses the underlying private method `_initialize` to handle the assignment
of parameters under the hood while the core function of the __init__ creates these components if
they do not already exist.
Args:
search_api (Optional[SearchAPI]): The search API to use for the retrieval of response records from APIs.
response_coordinator (Optional[ResponseCoordinator]):
Core class used to coordinate the handling and processing of all responses received from APIs.
parser (Optional[BaseDataParser]):
First step of the response processing pipeline - parses response records into a dictionary.
extractor (Optional[BaseDataExtractor]): Extracts both records and metadata from responses separately.
processor (Optional[ABCDataProcessor]):
Processes the previously extracted API records into list of dictionaries that are filtered and
optionally flattened during processing.
cache_manager (Optional[DataCacheManager]): Manages the caching of processed records for faster retrieval.
query (Optional[str]):
Query to be used when sending requests when creating an API - modifies the query if the API already
exists.
provider_name (Optional[str]):
The name of the API provider where requests will be sent. If a provider_name and base_url are both
given, the SearchAPIConfig will prioritize base_urls over the provider_name.
cache_requests (Optional[bool]):
Determines whether or not to cache requests - api is the ground truth if not directly specified
cache_results (Optional[bool]):
Determines whether or not to cache processed responses - on by default unless specified otherwise
annotate_records (Optional[bool]):
Indicates whether the DataExtractor should add unique, record-identifying fields to each extracted
record. These fields aid in record-linkage and the hashed identification of duplicates in later steps.
retry_handler (Optional[RetryHandler]): Class used to retry failed requests-cache.
validator (Optional[ResponseValidator]): Class used to verify and validate responses returned from APIs.
workflow (Optional[SearchWorkflow]):
An optional workflow used to customize how records are retrieved from APIs. Uses the default workflow
for the current provider when a workflow is not directly specified.
**kwargs: Keyword arguments to be passed to the SearchAPIConfig if a SearchAPI doesn't already exist.
Examples:
>>> from scholar_flux import SearchCoordinator
>>> from scholar_flux.api import APIResponse, ReconstructedResponse
>>> from scholar_flux.sessions import CachedSessionManager
>>> from typing import MutableMapping
>>> session = CachedSessionManager(user_agent = 'scholar_flux', backend='redis').configure_session()
>>> search_coordinator = SearchCoordinator(query = "Intrinsic Motivation", session = session, cache_results = False)
>>> response = search_coordinator.search(page = 1)
>>> response
# OUTPUT: <ProcessedResponse(len=50, cache_key='plos_Functional Processing_1_50', metadata='...') ': 1, 'maxSco...")>
>>> new_response = ReconstructedResponse.build(**response.response.__dict__)
>>> new_response.validate()
>>> new_response = ReconstructedResponse.build(response.response)
>>> ReconstructedResponse.build(new_response).validate()
>>> new_response.validate()
>>> newer_response = APIResponse.as_reconstructed_response(new_response)
>>> newer_response.validate()
>>> double_processed_response = search_coordinator._process_response(response = newer_response, cache_key = response.cache_key)
"""
if not query and search_api is None:
raise InvalidCoordinatorParameterException("Either 'query' or 'search_api' must be provided.")
api = self._create_search_api(
search_api, query=query, provider_name=provider_name, cache_requests=cache_requests, **kwargs
)
response_coordinator = self._create_response_coordinator(
response_coordinator, parser, extractor, processor, cache_manager, cache_results, annotate_records
)
self._initialize(api, response_coordinator, retry_handler, validator, workflow)
def _initialize(
self,
search_api: SearchAPI,
response_coordinator: ResponseCoordinator,
retry_handler: Optional[RetryHandler] = None,
validator: Optional[ResponseValidator] = None,
workflow: Optional[SearchWorkflow] = None,
) -> None:
"""Helper method for initializing the core components of the `SearchCoordinator` once created.
This method is used directly after the `SearchAPI` and the `ResponseCoordinator` are successfully created to
fully initialize the `SearchCoordinator` for API response retrieval and processing.
Args:
search_api (SearchAPI): The SearchAPI to use for the retrieval of response records from APIs
response_coordinator (ResponseCoordinator):
Core class used to coordinate the handling and processing of all responses received from APIs.
retry_handler (Optional[RetryHandler]): Class used to retry failed requests-cache
validator (Optional[ResponseValidator]): Class used to verify and validate responses returned from APIs.
workflow (Optional[SearchWorkflow]):
An optional workflow used to customize how records are retrieved from APIs. Uses the default workflow
for the current provider when a workflow is not directly specified.
"""
super()._initialize(search_api, response_coordinator)
self.retry_handler = retry_handler or RetryHandler(
min_retry_delay=self.search_api.request_delay,
backoff_factor=min(self.search_api.request_delay * 0.25, 0.5),
)
self.validator = validator or ResponseValidator()
self.workflow = workflow or WORKFLOW_DEFAULTS.get(self.search_api.provider_name)
@classmethod
def _create_search_api(
cls,
search_api: Optional[SearchAPI] = None,
provider_name: Optional[str] = None,
query: Optional[str] = None,
cache_requests: Optional[bool] = None,
**kwargs: Any,
) -> SearchAPI:
"""Helper method for creating a new Search API from its components or an existing SearchAPI.
This method is useful for when a `SearchAPI` instance needs to be created and used from scratch rather than
directly copied given constraints on copying session and cached session objects.
Args:
search_api (Optional[SearchAPI]):
The search API to use for the retrieval of response records from APIs.
provider_name (Optional[str]):
The name of the API provider where requests will be sent. If a `provider_name` and `base_url` are both
given, the `SearchAPIConfig` will prioritize the `base_url` over the `provider_name`.
query (Optional[str]):
Query to be used when sending requests when creating an API. Specifying a query when a `SearchAPI`
already exists will modify the query.
cache_requests (Optional[bool]):
Determines whether or not to cache requests. The `SearchAPI` defaults are the ground truth determinants
of whether caching is enabled if `cache_requests` is not specified.
**kwargs: Keyword arguments to be passed to the SearchAPIConfig if a SearchAPI doesn't already exist.
Returns:
SearchAPI: A new search API either based on the original search api with modified components or created
entirely anew.
"""
if not query and search_api is None:
raise InvalidCoordinatorParameterException("Either 'query' or 'search_api' must be provided.")
kwargs["use_cache"] = cache_requests if cache_requests is not None else kwargs.get("use_cache")
try:
api: SearchAPI = (
SearchAPI.from_defaults(cast("str", query), provider_name=provider_name, **kwargs)
if not search_api
else SearchAPI.update(search_api, query=query, provider_name=provider_name, **kwargs)
)
except APIParameterException as e:
logger.error("Could not initialize the SearchCoordinator due to an issue creating the SearchAPI.")
raise InvalidCoordinatorParameterException(
"Could not initialize the SearchCoordinator due to an API " f"parameter exception. {e}"
)
return api
@classmethod
def _create_response_coordinator(
cls,
response_coordinator: Optional[ResponseCoordinator] = None,
parser: Optional[BaseDataParser] = None,
extractor: Optional[BaseDataExtractor] = None,
processor: Optional[ABCDataProcessor] = None,
cache_manager: Optional[DataCacheManager] = None,
cache_results: Optional[bool] = None,
annotate_records: Optional[bool] = None,
) -> ResponseCoordinator:
"""Helper method for creating a new response coordinator either from an existing response coordinator with
overrides or created anew entirely from its core dependencies.
Args:
response_coordinator (Optional[ResponseCoordinator]):
Core class used to handle the processing and core handling of all responses from APIs
parser (Optional[BaseDataParser]):
First step of the response processing pipeline. Parses response records into a dictionary
extractor (Optional[BaseDataExtractor]):
Extracts both records and metadata from responses separately.
processor (Optional[ABCDataProcessor]):
Processes the previously extracted API records into list of dictionaries that are filtered and
optionally flattened during processing.
cache_manager (Optional[DataCacheManager]): Manages the caching of processed records for faster retrieval
cache_results (Optional[bool]):
Determines whether or not to cache processed responses. On by default unless specified otherwise.
annotate_records (Optional[bool]):
Indicates whether the DataExtractor should add unique, record-identifying fields to each extracted
record. These fields aid in record-linkage and the hashed identification of duplicates in later steps.
Returns:
ResponseCoordinator:
A new response coordinator consisting of the base components from the original response coordinator or
constructed directly from its components.
"""
try:
coordinator = (
ResponseCoordinator.build(parser, extractor, processor, cache_manager, cache_results, annotate_records)
if not response_coordinator
else ResponseCoordinator.update(
response_coordinator, parser, extractor, processor, cache_manager, cache_results, annotate_records
)
)
except (APIParameterException, InvalidCoordinatorParameterException) as e:
logger.error("Could not initialize the SearchCoordinator due to an issue creating the ResponseCoordinator.")
raise InvalidCoordinatorParameterException(
"Could not initialize the SearchCoordinator due to an "
f"exception creating the ResponseCoordinator. {e}"
)
return coordinator
[docs]
@classmethod
def as_coordinator(
cls, search_api: SearchAPI, response_coordinator: ResponseCoordinator, *args: Any, **kwargs: Any
) -> SearchCoordinator:
"""Helper factory method for building a SearchCoordinator that allows users to build from the final building
blocks of a SearchCoordinator.
Args:
search_api (Optional[SearchAPI]): The search API to use for the retrieval of response records from APIs
response_coordinator (Optional[ResponseCoordinator]): Core class used to handle the processing and
core handling of all responses from APIs
Returns:
SearchCoordinator: A newly created coordinator that orchestrates record retrieval and processing
"""
search_coordinator = cls.__new__(cls)
search_coordinator._initialize(search_api, response_coordinator, *args, **kwargs)
return search_coordinator
[docs]
@classmethod
def update(
cls,
search_coordinator: Self,
search_api: Optional[SearchAPI] = None,
response_coordinator: Optional[ResponseCoordinator] = None,
*,
retry_handler: Optional[RetryHandler] = None,
validator: Optional[ResponseValidator] = None,
workflow: Optional[SearchWorkflow] = None,
parser: Optional[BaseDataParser] = None,
extractor: Optional[BaseDataExtractor] = None,
processor: Optional[ABCDataProcessor] = None,
cache_manager: Optional[DataCacheManager] = None,
cache_results: Optional[bool] = None,
annotate_records: Optional[bool] = None,
**search_api_kwargs: Any,
) -> SearchCoordinator:
"""Helper factory method allowing the creation of a new SearchCoordinator from both current and new components.
A new coordinator can be created using the components from an existing configuration as a base while directly
replacing other components with new configurations. Note that this implementation does not directly copy the
underlying components if a new component is not selected.
Args:
SearchCoordinator:
A previously created coordinator containing the components to use if a default is not provided
search_api (Optional[SearchAPI]):
The search API to use for the retrieval of response records from APIs
response_coordinator (Optional[ResponseCoordinator]):
Core class used to handle the processing and core handling of all responses from APIs
retry_handler (Optional[RetryHandler]):
Class used to retry failed requests-cache
validator (Optional[ResponseValidator]):
Class used to verify and validate responses returned from APIs
workflow (Optional[SearchWorkflow]):
An optional workflow used to customize how records are retrieved from APIs. Uses the default workflow
for the current provider when a workflow is not directly specified and does not directly carry over in
cases where a new provider is chosen.
parser: (Optional[BaseDataParser]):
First step of the response processing pipeline - parses response records into a dictionary
extractor: (Optional[BaseDataExtractor]):
Extracts both records and metadata from responses separately
processor: (Optional[ABCDataProcessor]):
Processes API responses into list of dictionaries
cache_manager: (Optional[DataCacheManager]):
Manages the caching of processed records for faster retrieval
cache_requests: (Optional[bool]):
Determines whether or not to cache requests - api is the ground truth if not directly specified
cache_results: (Optional[bool]):
Determines whether or not to cache processed responses - on by default unless specified or if a cache
manager is already provided.
annotate_records (Optional[bool]):
When True, adds record-identifying linkage fields to each extracted record for resolution back to
original data after processing or flattening. Adds `_extraction_index` (position) and `_record_id`
(content hash + index). Default is None (no annotation).
Returns:
SearchCoordinator: A newly created coordinator that orchestrates record retrieval and processing
"""
if not isinstance(search_coordinator, SearchCoordinator):
raise InvalidCoordinatorParameterException(
f"Expected a SearchCoordinator to perform parameter updates. Received type {type(search_coordinator)}"
)
# Keeping or replacing existing SearchAPI components
search_api = search_api or search_coordinator.search_api
updated_search_api = (
cls._create_search_api(
search_api or search_coordinator.search_api,
**search_api_kwargs,
)
if search_api_kwargs
else search_api
)
# Keeping or replacing existing ResponseCoordinator components
response_coordinator_components = (parser, extractor, processor, cache_manager, cache_results, annotate_records)
response_coordinator = response_coordinator or search_coordinator.response_coordinator
updated_response_coordinator = (
cls._create_response_coordinator(response_coordinator, *response_coordinator_components)
if any(arg is not None for arg in response_coordinator_components)
else response_coordinator
)
if workflow is None:
# use the previous workflow only if the providers are the same
workflow = (
search_coordinator.workflow
if search_coordinator.search_api.provider_name == updated_search_api.provider_name
else None
)
# Validates SearchAPI and ResponseCoordinator components on creation
return cls.as_coordinator(
search_api=updated_search_api,
response_coordinator=updated_response_coordinator,
retry_handler=retry_handler or search_coordinator.retry_handler,
validator=validator or search_coordinator.validator,
workflow=workflow,
)
# Search Execution
[docs]
def search(
self,
page: int = 1,
from_request_cache: bool = True,
from_process_cache: bool = True,
use_workflow: Optional[bool] = True,
normalize_records: Optional[bool] = None,
**api_specific_parameters: Any,
) -> Optional[ProcessedResponse | ErrorResponse]:
"""Public method for retrieving and processing records from the API specifying the page and records per page.
Note that the response object is saved under the last_response attribute in the event that the response is
retrieved and processed successfully, irrespective of whether the response was cached.
Args:
page (int): The current page number. Used for process caching purposes even if not required by the API
from_request_cache (bool): This parameter determines whether to try to retrieve
the response from the requests-cache storage
from_process_cache (bool): This parameter determines whether to attempt to pull
processed responses from the cache storage
use_workflow (bool): Indicates whether to use a workflow if available Workflows are utilized by default.
normalize_records (Optional[bool]): Determines whether records should be normalized after processing
**api_specific_parameters (SearchAPIConfig): Fields to temporarily override when building the request.
Returns:
Optional[ProcessedResponse | ErrorResponse]:
A ProcessedResponse model containing the response (response), processed records (data), and article
metadata (metadata) if the response was successful. Otherwise returns an ErrorResponse where the reason
behind the error (message), exception type (error), and response (response) are provided.
Possible error responses also include a `NonResponse` (an `ErrorResponse` subclass) for cases where a
response object is irretrievable. Like the `ErrorResponse` class, `NonResponse` is also Falsy
(i.e., `not NonResponse` returns True)
Note: When specifying `cache_only=True`, this keyword argument is propagated to the `fetch` method, ensuring
that a fresh request is not sent to the current API when a previously cached response is unavailable from the
session cache. Instead, a `NonResponse` is returned that records the `PageUnavailableFromCacheException` and its
corresponding error message.
"""
try:
if use_workflow and self.workflow:
workflow_output = self.workflow(
self,
page=page,
from_request_cache=from_request_cache,
from_process_cache=from_process_cache,
normalize_records=normalize_records,
**api_specific_parameters,
)
return workflow_output.result if workflow_output is not None else None
else:
return self._search(
page,
from_request_cache=from_request_cache,
from_process_cache=from_process_cache,
normalize_records=normalize_records,
**api_specific_parameters,
)
except Exception as e:
logger.error(f"An unexpected error occurred when processing the response: {e}")
# `page` input could have a type issue, so create a cache key only if valid
cache_key = self._create_cache_key(page=page) if isinstance(page, int) and page >= 0 else None
return NonResponse.from_error(error=e, message=str(e), cache_key=cache_key)
[docs]
def parameter_search(
self,
from_request_cache: bool = True,
from_process_cache: bool = True,
normalize_records: Optional[bool] = None,
**api_specific_parameters: Any,
) -> ProcessedResponse | ErrorResponse:
"""Public method for retrieving and processing records from the API with pre-specified parameters.
Note that the response object is saved under the last_response attribute in the event that the response is
retrieved and processed successfully, irrespective of whether the response was cached.
Args:
from_request_cache (bool): This parameter determines whether to try to retrieve
the response from the requests-cache storage
from_process_cache (bool): This parameter determines whether to attempt to pull
processed responses from the cache storage
normalize_records (Optional[bool]): Determines whether records should be normalized after processing
**api_specific_parameters (SearchAPIConfig): Fields to temporarily override when building the request.
Returns:
Optional[ProcessedResponse | ErrorResponse]:
A ProcessedResponse model containing the response (response), processed records (data), and article
metadata (metadata) if the response was successful. Otherwise returns an ErrorResponse where the reason
behind the error (message), exception type (error), and response (response) are provided.
Possible error responses also include a `NonResponse` (an `ErrorResponse` subclass) for cases where a
response object is irretrievable. Like the `ErrorResponse` class, `NonResponse` is also Falsy
(i.e., `not NonResponse` returns True)
"""
# remove parameters that aren't relevant for parameter_searches
api_specific_parameters.pop("page", None)
api_specific_parameters.pop("use_workflow", None)
try:
return self._search(
page=None,
from_request_cache=from_request_cache,
from_process_cache=from_process_cache,
normalize_records=normalize_records,
**api_specific_parameters,
)
except Exception as e:
logger.error(f"An unexpected error occurred when processing the response: {e}")
# `page` input could have a type issue, so create a cache key only if valid
return NonResponse.from_error(error=e, message=str(e), cache_key=None)
[docs]
def search_records(
self,
min_records: int,
page_offset: int = 0,
from_request_cache: bool = True,
from_process_cache: bool = True,
use_workflow: Optional[bool] = True,
**api_specific_parameters: Any,
) -> SearchResultList:
"""Public method for retrieving and processing records by specifying the number of records to retrieve.
This method first calculates the total number of pages required to retrieve the specified number of records
and subsequently collects search results from multiple pages into a SearchResultList. The result list provides
specialized methods for filtering, normalization, selection, and aggregation. Unlike iter_pages(),
which streams results one at a time, this method returns the full collection for cross-page analysis
and batch operations.
The SearchResultList return type enables powerful operations like filtering out failures, normalizing
records across different providers, selecting subsets by query/provider/page, and joining all records
into a single list for DataFrame creation.
Args:
min_records (int):
The total number of records to retrieve sequentially.
page_offset (int):
The page offset indicating the number of pages to skip before beginning record retrieval (0 by default).
from_request_cache (bool):
This parameter determines whether to try to retrieve the response from the requests-cache
storage.
from_process_cache (bool):
This parameter determines whether to attempt to pull processed responses from the cache storage.
use_workflow (bool):
Indicates whether to use a workflow if available Workflows are utilized by default.
**api_specific_parameters (SearchAPIConfig):
Fields to temporarily override when building the request.
Returns:
SearchResultList:
A specialized list containing SearchResult instances for each requested page. The SearchResultList
provides methods including:
- filter(): Retain only successful ProcessedResponses or filter by success/failure
- select(): Filter results by query, provider_name, or page number
- normalize(): Apply field mapping to create provider-agnostic record schemas
- join(): Combine all records into a single list with optional metadata
- process_metadata(): Extract and process metadata across all results
- record_count: Total number of records across all pages
Note that retrieval stops early if a page response is None, not retrievable, or contains fewer
than the expected number of records, indicating that subsequent pages may be empty.
"""
page_results: SearchResultList = SearchResultList()
try:
if pages := PageListInput.from_record_count(min_records, self.search_api.records_per_page, page_offset):
search_results = self.iter_pages(
pages=pages,
from_request_cache=from_request_cache,
from_process_cache=from_process_cache,
use_workflow=use_workflow,
**api_specific_parameters,
)
for search_result in search_results:
page_results.append(search_result)
else:
logger.warning(
f"Cannot retrieve {min_records} records for records_per_page={self.search_api.records_per_page}. Halting "
"retrieval..."
)
except Exception as e:
logger.error(f"An unexpected error occurred when processing the response: {e}")
return page_results
[docs]
def search_pages(
self,
pages: Sequence[int] | PageListInput,
from_request_cache: bool = True,
from_process_cache: bool = True,
use_workflow: Optional[bool] = True,
**api_specific_parameters: Any,
) -> SearchResultList:
"""Public method for retrieving and processing records from the API specifying the page and records per page in
sequence.
This method collects search results from multiple pages into a SearchResultList, which provides
specialized methods for filtering, normalization, selection, and aggregation. Unlike iter_pages(),
which streams results one at a time, this method returns the full collection for cross-page analysis
and batch operations.
The SearchResultList return type enables powerful operations like filtering out failures, normalizing
records across different providers, selecting subsets by query/provider/page, and joining all records
into a single list for DataFrame creation.
Args:
pages (Sequence[int] | PageListInput):
A sequence of page numbers to request from the API Provider. Can be a list, range, or
PageListInput instance.
from_request_cache (bool):
This parameter determines whether to try to retrieve the response from the requests-cache
storage.
from_process_cache (bool):
This parameter determines whether to attempt to pull processed responses from the cache storage.
use_workflow (bool):
Indicates whether to use a workflow if available Workflows are utilized by default.
**api_specific_parameters (SearchAPIConfig):
Fields to temporarily override when building the request.
Returns:
SearchResultList:
A specialized list containing SearchResult instances for each requested page. The SearchResultList
provides methods including:
- filter(): Retain only successful ProcessedResponses or filter by success/failure
- select(): Filter results by query, provider_name, or page number
- normalize(): - Apply field mapping to create provider-agnostic record schemas
- join(): - Combine all records into a single list with optional metadata
- process_metadata(): - Extract and process metadata across all results
- record_count: - Total number of records across all pages
Note:
Retrieval stops early if a page response is None, not retrievable, or contains fewer than the expected
number of records, indicating that subsequent pages may be empty.
When `cache_only=True`, the `fetch` step will only fetch valid responses from cache. If a NonResponse is
returned due to a cache miss, the search will continue without halting.
"""
page_results: SearchResultList = SearchResultList()
try:
search_results = self.iter_pages(
pages=pages,
from_request_cache=from_request_cache,
from_process_cache=from_process_cache,
use_workflow=use_workflow,
**api_specific_parameters,
)
for search_result in search_results:
page_results.append(search_result)
except Exception as e:
logger.error(f"An unexpected error occurred when processing the response: {e}")
return page_results
[docs]
def iter_pages(
self,
pages: Sequence[int] | PageListInput,
from_request_cache: bool = True,
from_process_cache: bool = True,
use_workflow: Optional[bool] = True,
**api_specific_parameters: Any,
) -> Generator[SearchResult, None, None]:
"""Helper method that creates a generator function for retrieving and processing records from the API Provider
for a page range in sequence. This implementation dynamically examines the properties of the page search result
for each retrieved API response to determine whether or not iteration should halt early versus determining
whether iteration should continue.
This method is directly used by SearchCoordinator.search_pages to provide a clean interface that abstracts
the complexity of iterators and is also provided for convenience when iteration is more preferable.
Args:
pages (Sequence[int] | PageListInput): A sequence of page numbers to request from the API Provider.
from_request_cache (bool): This parameter determines whether to try to retrieve the response from the
requests-cache storage.
from_process_cache (bool): This parameter determines whether to attempt to pull processed responses from
the cache storage.
use_workflow (bool): Indicates whether to use a workflow if available Workflows are utilized by default.
**api_specific_parameters (SearchAPIConfig): Fields to temporarily override when building the request.
Yields:
SearchResult: Iteratively returns the SearchResult for each page using a generator expression.
Each result contains the requested page number (page), the name of the provider
(provider_name), and the result of the search containing a ProcessedResponse,
an ErrorResponse, or None (api response)
"""
# preprocesses the iterable or sequence of pages to reduce redundancy and validate beforehand
page_list_input = self._validate_page_list_input(pages)
for page in page_list_input.page_numbers:
search_result = self.search_page(
page=page,
from_request_cache=from_request_cache,
from_process_cache=from_process_cache,
use_workflow=use_workflow,
**api_specific_parameters,
)
halt = self._process_page_result(search_result.response_result, page)
yield search_result
if halt:
break
[docs]
def search_page(
self,
page: int,
from_request_cache: bool = True,
from_process_cache: bool = True,
use_workflow: Optional[bool] = True,
**api_specific_parameters: Any,
) -> SearchResult:
"""Retrieves a single-page `SearchResult`, returning the processed response with additional metadata.
This method is used to support the retrieval of a page range while wrapping each result in a
SearchResult class as a BaseModel that provides more structured information about the received API Response,
including the provider's name, the page number, and the response result.
The `SearchResult.response_result` attribute can hold three different types of responses:
1. ProcessedResponse - indicates the successful retrieval and processing of the data
2. ErrorResponse/Nonresponse - indicates that a response was successfully received, but that an error
occurred during request building, response retrieval or response processing
3. None - indicates an issue in the retrieval of the response or formatting/preparation of the request
The SearchResult wrapper enables:
- **Introspection**: Access provider, query, and page without unpacking the response
- **Aggregation**: Combine results across pages with consistent metadata
- **Normalization**: Apply field mapping to create provider-agnostic schemas
When a workflow is active, the provider name is determined from the last-queried URL to ensure correct labeling.
For non-workflow searches, the SearchAPI's provider name is used.
Args:
page (int): The current page number. Used for process caching purposes even if not required by the API
from_request_cache (bool):
This parameter determines whether to try to retrieve the response from the requests-cache storage.
from_process_cache (bool):
This parameter determines whether to attempt to pull processed responses from the cache storage.
use_workflow (bool): Indicates whether to use a workflow if available Workflows are utilized by default.
**api_specific_parameters (SearchAPIConfig): Fields to temporarily override when building the request.
Returns:
SearchResult:
A search result containing the requested page number (page), the name of the provider (provider_name),
and the result of the search (api_response) which contains a ProcessedResponse, an ErrorResponse,
or None.
Note:
When specifying `cache_only=True`, this keyword argument is propagated to `fetch` method, ensuring that a
fresh request is not sent to the current API when a previously cached response is unavailable from the
session cache. Instead, a SearchResult containing a `NonResponse` is returned, recording the
`PageUnavailableFromCacheException` and its corresponding error message.
"""
api_response = self.search(
page=page,
from_request_cache=from_request_cache,
from_process_cache=from_process_cache,
use_workflow=use_workflow,
**api_specific_parameters,
)
# for workflow resolution where needed
if self.workflow and use_workflow:
provider_url = api_response.url if api_response is not None else None
provider_config = provider_registry.resolve_config(
provider_url, self.search_api.provider_name, verbose=False
)
provider_name = provider_config.provider_name if provider_config else self.search_api.provider_name
else:
provider_name = self.search_api.provider_name
search_result = SearchResult(
response_result=api_response,
provider_name=provider_name,
query=self.search_api.query,
page=page,
)
return search_result
@classmethod
def _validate_page_list_input(cls, pages: Sequence[int] | PageListInput) -> PageListInput:
"""Helper method for validating the input to pages: Used to coerce a sequence of pages to PageListInput if
possible.
Args:
pages (Sequence[int] | PageListInput): The input to pass to search_pages containing
a sequence of pages to retrieve.
Returns:
PageListInput: If the conversion to a page_list_input object was successful.
Raises:
InvalidCoordinatorParameterException: If conversion to a page list is not possible.
"""
try:
page_list_input = pages if isinstance(pages, PageListInput) else PageListInput(pages)
return page_list_input
except ValidationError as e:
raise InvalidCoordinatorParameterException(
"Expected `pages` to be a list or other sequence of integer "
f"pages. Received an error on validation: {e}"
)
def _process_page_result(self, response_result: Optional[ErrorResponse | ProcessedResponse], page: int) -> bool:
"""Helper method for logging the result of each page search and determining whether to continue."""
halt = True
if isinstance(response_result, ProcessedResponse):
expected_page_count = self.search_api.config.records_per_page
total_hits = response_result.total_query_hits
# 0 and None signal that processing should halt or reference the expected_page_count, respectively
pages_remaining = (
ResponseMetadataMap._calculate_pages_remaining(page, total_hits, expected_page_count)
if page is not None and total_hits is not None and expected_page_count is not None
else None
)
if pages_remaining == 0 or (
len(response_result.extracted_records or []) < expected_page_count and pages_remaining is None
):
logger.warning(
f"The response from {self.display_name} for page, {page} contains less than the expected "
f"{expected_page_count} records. Received {repr(response_result)}. "
f"Halting multi-page retrieval..."
)
else:
halt = False
elif (response_result is None or isinstance(response_result, NonResponse)) and page == 0:
logger.warning("Skipping the page number, 0, as it is not a valid page number...")
halt = False
elif isinstance(response_result, NonResponse) and response_result.error == "PageUnavailableFromCacheException":
# Continue retrieval if needed: relevant when an intermediate page may or may not be available
halt = False
elif isinstance(response_result, ErrorResponse) and not isinstance(response_result, NonResponse):
status_code = response_result.status_code
status_description = (
f"(Status Code: {status_code}={response_result.status})" if status_code else "(Status Code: Missing)"
)
logger.warning(
f"Received an invalid response from {self.display_name} for page {page}. "
f"{status_description}. Halting multi-page retrieval..."
)
else:
logger.warning(
f"Could not retrieve a valid response code for page {page}. "
f"Received {repr(response_result)}. Halting multi-page retrieval..."
)
return halt
[docs]
def search_data(self, page: int = 1, *args: Any, **kwargs: Any) -> Optional[RecordList]:
"""Public convenience method to perform a search, specifying the page and records per page.
Note that instead of returning a ProcessedResponse or ErrorResponse, this calls the `search` method an
retrieves only the list of processed dictionary records from the ProcessedResponse.
Args:
page (int): The current page number.
*args: Positional arguments to pass directly to the `.search()` method
**kwargs: Keyword arguments to pass directly to the `.search()` method
Returns:
Optional[RecordList]:
A list of record dictionaries containing the processed article data when parsed successfully
and records exist. If no records exist, or an error occurs somewhere within the processes,
None is returned, instead.
"""
try:
response = self.search(page, *args, **kwargs)
if response:
return response.data
except Exception as e:
logger.error(f"An unexpected error occurred when attempting to retrieve the processed response data: {e}")
return None
# Search Execution
def _search(
self,
page: Optional[int] = 1,
from_request_cache: bool = True,
from_process_cache: bool = True,
normalize_records: Optional[bool] = None,
**api_specific_parameters: Any,
) -> ProcessedResponse | ErrorResponse:
"""Helper method for retrieving and processing records from the API specifying the page and records per page.
This method is called to perform all steps necessary to retrieve and process a response from the selected API.
Beyond catching basic exceptions related to raised error codes and processing response issues, further errors
are to be caught at a higher level such as in the public SearchCoordinator.search method.
Args:
page (int): The current page number. Used for process caching purposes even if not required by the API
from_request_cache (bool): Indicates whether to attempt to retrieve the response from the requests-cache
from_process_cache (bool): This parameter determines whether to attempt to pull processed responses from
the processing cache storage device (or memory)
normalize_records (Optional[bool]): Determines whether records should be normalized
**api_specific_parameters (SearchAPIConfig): Fields to temporarily override when building the request.
Returns:
ProcessedResponse | ErrorResponse:
A Processed API Response if successful, Otherwise, returns an ErrorResponse.
"""
# all missing response values are handled at this step and transformed into NonResponses
api_response = self._fetch_api_response(page, from_request_cache=from_request_cache, **api_specific_parameters)
self._log_response_source(api_response, page, api_response.cache_key)
# if there is no data to process within the response or if there is an existing ErrorResponse, return it as is
if isinstance(api_response, NonResponse):
return api_response
if isinstance(api_response, ErrorResponse):
self.last_response = api_response
return api_response
# otherwise process the data before returning it
processed_response = self._process_response(
response=cast("ResponseProtocol", api_response.response),
cache_key=cast("str", api_response.cache_key),
from_process_cache=from_process_cache,
normalize_records=normalize_records,
)
return processed_response
# Request Handling
[docs]
def fetch(
self,
page: Optional[int],
from_request_cache: bool = True,
raise_on_error: bool = False,
cache_only: bool = False,
**api_specific_parameters: Any,
) -> Optional[Response | ResponseProtocol]:
"""Fetches the raw response from the current API or from cache if available.
If `page` is None, `fetch` will default to a basic parameter search using the API base URL given the specified
parameters.
Args:
page (Optional[int]): The page number to retrieve from the cache.
from_request_cache (bool): This parameter determines whether to try to fetch a valid response from cache.
raise_on_error (bool): Indicates whether an error should be raised when failing to fetch a valid response.
cache_only (bool): Flag indicating whether the search should only attempt to retrieve the page from cache.
**api_specific_parameters (SearchAPIConfig): Fields to temporarily override when building the request.
Returns:
Optional[Response]: The response object if available, otherwise None.
Raises:
RetryAfterDelayExceededException:
If the server-requested delay until the next request exceeds the user-specified maximum wait time as
configured through the `RetryHandler`.
RequestFailedException:
If an unexpected error occurs during the retrieval process as orchestrated via the `RetryHandler`.
"""
current_page = str(page) if page is not None else f" for {self.search_api.base_url}"
response: Optional[Response | ResponseProtocol] = None
try:
if from_request_cache and (response := self.get_cached_request(page, **api_specific_parameters)):
return response
elif from_request_cache and cache_only:
message = (
f"Failed to retrieve page {current_page} from the session cache for the provider, "
f"{self.display_name}"
)
message += ": The session cache has not been enabled." if not self.api.cache else "."
raise PageUnavailableFromCacheException(message=message)
else:
# if the key does not exist, will log at the INFO level and continue
self._delete_cached_request(page, **api_specific_parameters)
self._respect_retry_after()
response = self.robust_request(page, **api_specific_parameters)
except RetryAfterDelayExceededException as e:
msg = f"Failed to fetch page {current_page}"
e.message = f"{msg}: {e}" if str(e) else msg
if raise_on_error:
raise
logger.warning(e.message)
response = e.response
except PageUnavailableFromCacheException as e:
if raise_on_error:
raise
logger.warning(e.message)
except RequestFailedException as e:
msg = f"Failed to fetch page {current_page}"
err = f"{msg}: {e}" if str(e) else msg
if raise_on_error:
raise RequestFailedException(err) from e
logger.warning(err)
return response
def _respect_retry_after(self) -> None:
"""Helper method that respects `retry_after` field before requests exceed dynamic API rate limits."""
# If the current URL has not changed from the last request, attempt to extract a Retry-After parameter directly
last_response = self.last_response
if (
last_response is not None
and last_response.url
and normalize_url(self.search_api.base_url) == normalize_url(last_response.url, remove_parameters=True)
):
# parsed `retry-after` value as a float - this accounts the amount of time that has elapsed since last-call
retry_after_value = self.retry_handler.extract_retry_after_from_response(last_response.response)
delay = self.retry_handler.parse_retry_after(retry_after_value)
# if no delay exists, skip a delay
if not delay:
return
# attempts to coerce the unparsed value into a numeric value
retry_after_date = try_call(
self.retry_handler._parse_retry_after_date, (retry_after_value,), suppress=(ValueError,), log_level=10
)
# Indicates the time that the ErrorResponse/NonResponse was created:
response_created_at = parse_iso_timestamp(last_response.created_at or "")
# If a retry after date is N/A, attempt to extract the response creation date from the APIResponse.
reference_time = None if retry_after_date else response_created_at
# Refer to the delay calculated from a valid `retry_after_date` as the source of truth when possible.
retry_after_timestamp = (
retry_after_date.timestamp()
if isinstance(retry_after_date, datetime)
else (delay + reference_time.timestamp() if isinstance(reference_time, datetime) and delay else None)
)
timestamp = time()
if retry_after_timestamp and timestamp < retry_after_timestamp:
delay_remaining = retry_after_timestamp - timestamp
formatted_timestamp = (
response_created_at.strftime(" on %Y-%m-%d at %H:%M:%S") if response_created_at else ""
)
error_message = (
f"A rate limit of {delay_remaining:.2f}s still remains in effect before "
f"the next request can be sent to {self.display_name}."
)
warning_message = (
"RetryHandler.RAISE_ON_DELAY_EXCEEDED is disabled. The SearchCoordinator will wait for the "
f"remaining duration of {delay_remaining:.2f}s to send the next request as requested by "
f"{self.display_name}, even if it exceeds your configured maximum wait duration. This may result "
"in long waits."
)
if not self.retry_handler.delay_exceeds_max_backoff(
delay_remaining,
error_message=error_message,
warning_message=warning_message,
):
logger.info(
f"{self.display_name} sent a `Retry-After` field of {delay}s{formatted_timestamp}. Respecting "
f"the delay of ~{delay_remaining:.2f}s..."
)
self.search_api.rate_limiter.wait_since(
delay,
reference_time,
metadata=dict(
url=self.search_api.base_url,
caller="_respect_retry_after",
request_delay=delay,
query=self.search_api.query,
),
)
[docs]
def robust_request(
self, page: Optional[int], **api_specific_parameters: Any
) -> Optional[Response | ResponseProtocol]:
"""Constructs and sends a request to the current API. Fetches a response from the current API.
Args:
page (Optional[int]):
The page number to retrieve from the cache. If missing, this implementation relies on
`api_specific_parameters` to retrieve data from an API.
**kwargs: Optional Additional parameters to pass to the SearchAPI
Returns:
Optional[Response | ResponseProtocol]: The request/response-like object if available, otherwise None.
"""
try:
request_delay = api_specific_parameters.get("request_delay") or (
self.search_api.request_delay
if self.search_api.request_delay > self.retry_handler.min_retry_delay
else self.retry_handler.min_retry_delay
)
if api_specific_parameter_fields := self.search_api.parameter_config.extract_parameters(
api_specific_parameters
):
api_specific_parameters["parameters"] = api_specific_parameter_fields
response = self.retry_handler.execute_with_retry(
request_func=self.search_api.search,
validator_func=self.validator.validate_response,
sleep_func=partial(
self.search_api.rate_limiter.sleep,
metadata=dict(
url=self.search_api.base_url,
caller="execute_with_retry",
page=page,
query=self.search_api.query,
),
),
page=page,
min_retry_delay=request_delay,
backoff_factor=max(min(request_delay * 0.25, 0.5), self.retry_handler.backoff_factor),
**api_specific_parameters,
)
except RetryAfterDelayExceededException as e:
msg = f"Failed to get a valid response from the {self.search_api.provider_name} API"
e.message = f"{msg}: {e}" if str(e) else msg
logger.error(e.message)
raise
except RequestFailedException as e:
msg = f"Failed to get a valid response from the {self.search_api.provider_name} API"
err = f"{msg}: {e}" if str(e) else msg
logger.error(err)
raise RequestFailedException(err) from e
if getattr(response, "from_cache", False):
logger.info(f"Retrieved cached response for query: {self.search_api.query} and page: {page}")
return response
def _fetch_api_response(
self, page: Optional[int], from_request_cache: bool = True, **api_specific_parameters: Any
) -> APIResponse:
"""Helper method for fetching the response and retrieving the cache key.
Args:
page (Optional[int]): The page number to retrieve from the cache.
from_request_cache (bool): This parameter determines whether to try to fetch a valid response from cache.
**api_specific_parameters (SearchAPIConfig): Fields to temporarily override when building the request.
Returns:
APIResponse | NonResponse: A data class containing the response and cache key when successfully retrieved,
independent of status code, and a NonResponse otherwise when retrieval is
unsuccessful due to an error.
"""
cache_key = self._create_cache_key(page) if page is not None else None
try:
response = self.fetch(
page, from_request_cache=from_request_cache, raise_on_error=True, **api_specific_parameters
)
if not cache_key and response and response.url:
cache_key = self._create_cache_key(page=None, url=response.url)
except RetryAfterDelayExceededException as e:
# Note: NonResponse classes won't be recorded in `last_response`, this allows retrieval of the last 429
error_type = ErrorResponse if e.response is not None else NonResponse
error_response = error_type.from_error(response=e.response, cache_key=cache_key, message=e.message, error=e)
return error_response
except PageUnavailableFromCacheException as e:
# Note: NonResponse classes won't be recorded in `last_response`, this allows retrieval of the last 429
non_response = NonResponse.from_error(cache_key=cache_key, message=e.message, error=e)
return non_response
except RequestFailedException as e:
return NonResponse.from_error(error=e, message=str(e), cache_key=cache_key)
if response is None:
message = f"Response retrieval for cache key {cache_key} was unsuccessful"
logger.info(f"{message}.")
detailed_message = f"{message}: Check the logs for more information."
return NonResponse(cache_key=cache_key, message=detailed_message)
return APIResponse(response=response, cache_key=cache_key)
def _log_response_source(
self,
response: Optional[APIResponse | Response | ResponseProtocol],
page: Optional[int],
cache_key: Optional[str],
) -> None:
"""Logs and indicates whether the received response is fresh or retrieved from session cache.
The response structure is checked to determine whether a response is a `requests_cache.CachedResponse` or
whether it was retrieved directly from the current API. This method also indicates whether we're using the
response processing cache from the `ResponseCoordinator` to attempt to pull from cache if available.
Args:
response (Response): Response retrieved from a request.
page (int): The current page number.
cache_key (Optional[str]): An optional cache key associated with the current request.
"""
current_page = str(page) if page is not None else getattr(response, "url", None)
current_page = f"page {current_page}" if current_page else "the current page"
message = response.message if isinstance(response, ErrorResponse) else ""
if response is None or isinstance(response, NonResponse):
message = f"Response retrieval from {self.display_name} for {current_page} was unsuccessful"
message += f": {response.message}" if response is not None and response.message else "."
logger.warning(message)
return
elif isinstance(response, ErrorResponse):
message = (
f"Response retrieval and processing from {self.display_name} for {current_page} was unsuccessful: "
f"{response.message}"
)
logger.warning(message)
return
if isinstance(response, APIResponse) or getattr(response, "from_cache", False):
logger.info(f"Retrieved a cached response for cache key: {cache_key}")
if self.response_coordinator.cache_manager:
logger.info(f"Handling response (cache key: {cache_key})")
else:
logger.info("Handling response")
def _process_response(
self,
response: Response | ResponseProtocol,
cache_key: str,
from_process_cache: bool = True,
normalize_records: Optional[bool] = None,
) -> ProcessedResponse | ErrorResponse:
"""Helper method for processing records from the API and, upon success, saving records to cache if
from_process_cache = True and caching is enabled.
Args:
response (Optional[Response]): The response retrieved from an API
cache_key (Optional[str]): The key used for caching responses, data processing, and metadata when enabled
from_process_cache (bool): Indicates whether or not to pull from cache when available.
This option is only relevant when a caching backend is enabled.
normalize_records (Optional[bool]): Determines whether records should be normalized after processing
Returns:
ProcessedResponse | ErrorResponse:
A Processed API Response if successful, Otherwise, returns an ErrorResponse
"""
# assume that the entered value is a response protocol to be further validated when handled
processed_response = self.response_coordinator.handle_response(
response,
cache_key,
from_cache=from_process_cache,
normalize_records=normalize_records,
)
if isinstance(processed_response, (ErrorResponse, ProcessedResponse)):
self.last_response = processed_response
return processed_response
def _prepare_request(self, page: Optional[int], **kwargs: Any) -> PreparedRequest:
"""Prepares the request after constructing the request parameters for the API call.
If neither a page nor extra keyword arguments are prepared, the request URL defaults
to the base URL.
Supports two parameter styles that are functionally equivalent:
- Nested: `_prepare_request(page=None, parameters={'filter': 'value'})`
- Flat: `_prepare_request(page=None, filter='value')`
When both are provided, flat kwargs take precedence over nested parameters.
Args:
page (Optional[int]): The page number to request.
**kwargs:
Additional parameters for the request. Can include a 'parameters' dict that will be merged with other
kwargs. Note that the `endpoint` parameter is directly extracted from the parameter list and formatted
as a valid endpoint, separate from the parameter list. No preprocessing for `endpoint` is required.
Returns:
PreparedRequest: The prepared request object to send to the api
"""
parameters = self.search_api._validate_parameters((kwargs.pop("parameters", {}))) | kwargs
endpoint = parameters.pop("endpoint", None)
request = self.search_api.prepare_search(page, parameters, endpoint=endpoint)
return request
# Cache Management
def _create_cache_key(self, page: Optional[int], url: Optional[str] = None) -> str:
"""Creates a processing cache identifier from the current search query, page, and URL if provided.
The cache key is generated using the current page argument, as well as the provider_name, query, and
records_per_page, all of which originate from the SearchAPIConfig (accessible as properties). If a page
parameter is not provided and a valid URL is given, a cache key can instead be calculated by hashing the URL
with hashlib's sha256 implementation (via `DataCacheManager._cache_key_from_url`) when possible. As a result,
consistency in cache key formation is guaranteed for the same input.
Args:
page (Optional[int]): The current page number. None for parameter-based searches.
url (Optional[str]): The request URL for parameter-based cache keys. Used when page is None.
Returns:
str: A unique cache key based on the provided parameters.
"""
if not page and url is not None and validate_url(url, verbose=False):
return DataCacheManager._cache_key_from_url(url)
return (
f"{self.search_api.provider_name}_{self.search_api.query}_{page}_{self.search_api.records_per_page}".lower()
)
[docs]
def get_cached_request(self, page: Optional[int], **kwargs: Any) -> Optional[Response | ResponseProtocol]:
"""Retrieves the cached request for a given page number if available.
Args:
page (Optional[int]): The page number to retrieve from the cache.
Returns:
Optional[Response]: The cached request object if available, otherwise None.
"""
try:
if not self.search_api.cache:
return None
request_key = self._get_request_cache_key(page, **kwargs)
if not request_key:
return None
return self.search_api.cache.get_response(request_key)
except RequestCacheException as e:
logger.error(f"Error retrieving cached request: {e}")
return None
[docs]
def get_cached_response(
self, page: int, url: Optional[str] = None, **kwargs: Any
) -> Optional[ProcessedResponse | ErrorResponse]:
"""Retrieves the cached response for a given page number if available.
This method attempts to retrieve processed cache information when available, preferring the retrieval of
processed cached data when available, despite whether the underlying request was cached.
If the cached request does not exist, and the processed response data does exist, this method creates a
ProcessedResponse with ReconstructedResponse when possible.
If the cached request exists or is newer, this method returns the ProcessedResponse after handling the raw
cached response object.
Args:
page (int): The page number to retrieve from the cache.
url (Optional[str]): The request URL for parameter-based cache keys. Used when page is None.
**kwargs: Additional arguments to pass to `get_cached_requests` for the reconstruction of a cached response
Returns:
Optional[ProcessedResponse | ErrorResponse]: The cached/reconstructed response if available.
"""
try:
if not self.response_coordinator.cache_manager:
return None
cache_key = self._create_cache_key(page, url)
response = cast("ResponseProtocol", self.get_cached_request(page, **kwargs))
is_valid_cache = self.response_coordinator.cache_manager.cache_is_valid(cache_key)
cached = (
self.response_coordinator.handle_response(response, cache_key) if response or is_valid_cache else None
)
if cached:
logger.info(f"Cache hit for key: {cache_key}")
return cached
logger.info(f"Cache miss for key: {cache_key}")
return cached
except StorageCacheException as e:
logger.error(f"Error retrieving cached response: {e}")
return None
[docs]
def get_cached_search_result(self, page: int, url: Optional[str] = None, **kwargs: Any) -> Optional[SearchResult]:
"""Retrieves a SearchResult containing a ProcessedResponse for a given page number if available.
This is convenience method that uses `get_cached_response` under the hood to retrieve and format a response as a
SearchResult instance.
If the cached response does not exist, this method will return `None` instead.
Args:
page (int): The page number to retrieve from the cache.
url (Optional[str]): The request URL for parameter-based cache keys. Used when page is None.
**kwargs: Additional arguments to pass to `get_cached_response` for the reconstruction of a cached response
Returns:
Optional[SearchResult]: The search result containing the reconstructed response result if available.
"""
if not isinstance(page, int):
logger.warning("A valid page number is required to create a SearchResult from a cached response.")
return None
cached_response = self.get_cached_response(page=page, url=url, **kwargs)
if cached_response is None:
return None
return SearchResult(
response_result=cached_response,
query=self.search_api.query,
provider_name=self.provider_name,
page=page,
)
[docs]
def get_cached_response_keys(self) -> list[str]:
"""Finds all cache keys from cached, paginated requests made with the current query."""
if not self.response_coordinator.cache_manager:
return []
provider_name = self.provider_name.lower()
query = self.search_api.query.lower()
search_structure = f"{provider_name}_{re.escape(query)}_([0-9]+)_[0-9]+$"
cached_keys = self.response_coordinator.cache_manager.cache_storage.retrieve_keys() or []
return [
cache_key
for cache_key in cached_keys
if isinstance(cache_key, str) and re.search(search_structure, cache_key) is not None
]
def _get_request_cache_key(self, page: Optional[int], **kwargs: Any) -> Optional[str]:
"""Creates a stable cache key for a request using `SearchAPI.cache.create_key` if the session cache is enabled.
If a page is not supplied (is NA), then keyword arguments are instead
used to generate a cache key from the prepared request.
Args:
page (Optional[int]): The page number associated with the request key.
**kwargs: Additional parameters for the request.
Returns:
str: The prepared request key to be associated with the request
"""
try:
if self.search_api.cache:
request = self._prepare_request(page, **kwargs)
request_key = self.search_api.cache.create_key(request)
return request_key
except (APIParameterException, AttributeError, ValueError) as e:
logger.error("Error retrieving requests-cache key")
raise RequestCacheException(
f"Error retrieving requests-cache key from session: {self.search_api.session}: {e}"
)
return None
def _delete_cached_request(self, page: Optional[int], **kwargs: Any) -> None:
"""Deletes the cached request for a given page number if available.
Args:
page (Optional[int]): The page number to delete from the cache.
"""
if self.search_api.cache:
try:
request_key = self._get_request_cache_key(page, **kwargs)
logger.debug(f"Attempting to delete requests cache key: {request_key}")
if not request_key:
raise KeyError("Request key is None or empty")
if not self.search_api.cache.contains(request_key):
raise KeyError(f"Key {request_key} not found in the API session request cache")
self.search_api.cache.delete(request_key)
except KeyError as e:
logger.info(f"A cached response for the current request does not exist: {e}")
except Exception as e:
logger.error(f"Error deleting cached request: {e}")
def _delete_cached_response(self, page: Optional[int], url: Optional[str] = None) -> None:
"""Deletes the cached response for a given page number if available.
Args:
page (int): The page number to delete from the cache.
url (Optional[str]): The request URL for parameter-based cache keys. Used when page is None.
"""
if self.response_coordinator.cache_manager:
try:
cache_key = self._create_cache_key(page, url)
logger.debug(f"Attempting to delete processing cache key: {cache_key}")
self.response_coordinator.cache_manager.delete(cache_key)
except Exception as e:
logger.error(f"Error in deleting from processing cache: {e}")
__all__ = ["SearchCoordinator"]