scholar_flux.api.rate_limiting package
Submodules
scholar_flux.api.rate_limiting.rate_limiter module
The scholar_flux.api.rate_limiting.rate_limiter module implements a simple, general RateLimiter.
ScholarFlux uses and builds upon this RateLimiter implementation to ensure that the number of requests to an API provider does not exceed the limit within the specified time interval.
- class scholar_flux.api.rate_limiting.rate_limiter.RateLimiter(min_interval: int | float | None = None)[source]
Bases:
objectA basic rate limiter used to ensure that function calls (such as API requests) do not exceed a specified rate.
The RateLimiter is used within ScholarFlux to throttle the total number of requests that can be made within a defined time interval (measured in seconds).
This class ensures that calls to RateLimiter.wait() (or any decorated function) are spaced by at least min_interval seconds.
For multithreading applications, the RateLimiter is not thread-safe. Instead, the ThreadedRateLimiter subclass can provide a thread-safe implementation when required.
- Parameters:
min_interval (Optional[float | int]) – The minimum number of seconds that must elapse before another request sent or call is performed. If min_interval is not specified, then class attribute, RateLimiter.DEFAULT_MIN_INTERVAL will be assigned to RateLimiter.min_interval instead.
Examples
>>> import requests >>> from scholar_flux.api import RateLimiter >>> rate_limiter = RateLimiter(min_interval = 5) >>> # The first call won't sleep, because a prior call using the rate limiter doesn't yet exist >>> with rate_limiter: ... response = requests.get("http://httpbin.org/get") >>> # will sleep if 5 seconds since the last call hasn't elapsed. >>> with rate_limiter: ... response = requests.get("http://httpbin.org/get") >>> # Or simply call the `wait` method directly: >>> rate_limiter.wait() >>> response = requests.get("http://httpbin.org/get")
Note
The class-level history deque is a design choice, This attribute allows class-level monitoring and introspection into how request delays are computed. The HistoryDeque is thread-safe (uses cpython on the backend) and allows global observability which is helpful for debugging, especially in cases where you need to adjust the total amount of requests sent within a given interval to avoid 429 errors.
- DEFAULT_MIN_INTERVAL: float | int = 6.1
- __init__(min_interval: int | float | None = None)[source]
Initializes the rate limiter with the min_interval argument.
- Parameters:
min_interval (Optional[float | int]) – Minimum number of seconds to wait before the next call is performed or request sent.
- default_min_interval() float | int[source]
Returns the default minimum interval for the current rate limiter.
- history: HistoryDeque[RateLimitEvent] = HistoryDeque([])
- property min_interval: float | int
The minimum number of seconds that must elapse before another request sent or action is taken.
- rate(min_interval: float | int, metadata: Dict[str, Any] | None = None) Iterator[Self][source]
Temporarily adjusts the minimum interval between function calls or requests when used with a context manager.
After the context manager exits, the original minimum interval value is then reassigned its previous value, and the time of the last call is recorded.
- Parameters:
min_interval (float | int) – Indicates the minimum interval to be temporarily used during the call
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- Yields:
RateLimiter – The original rate limiter with a temporarily changed minimum interval
- classmethod resize_history(maxlen: int) None[source]
Resize the global history deque, preserving existing records up to the new limit.
- sleep(interval: int | float | None = None, metadata: Dict[str, Any] | None = None) None[source]
Simple Instance level implementation of sleep that can be overridden when needed.
- Parameters:
interval (Optional[float | int]) – The time interval to sleep. If None, the default minimum interval for the current rate limiter is used. must be non-null, otherwise, the default min_interval value is used.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- Exceptions:
APIParameterException: Occurs if the value provided is either not an integer/float or is less than 0
- wait(min_interval: int | float | None = None, metadata: Dict[str, Any] | None = None) None[source]
Block (time.sleep) until at least min_interval has passed since last call.
This method can be used with the min_interval attribute to determine when a search was last sent and throttle requests to make sure rate limits aren’t exceeded. If not enough time has passed, the API will wait before sending the next request.
- Parameters:
min_interval (Optional[float | int]) – The minimum time to wait until another call is sent. Note that the min_interval attribute or argument must be non-null, otherwise, the default min_interval value is used.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- Exceptions:
APIParameterException: Occurs if the value provided is either not an integer/float or is less than 0
- wait_since(min_interval: int | float | None = None, timestamp: float | int | datetime | None = None, metadata: Dict[str, Any] | None = None) None[source]
Wait based on a reference timestamp or datetime.
- Parameters:
min_interval (Optional[float | int]) – Minimum interval to wait. Uses default if None.
timestamp (Optional[float | int | datetime]) – Reference time such as a Unix timestamp or datetime. If None, sleeps for min_interval.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
scholar_flux.api.rate_limiting.retry_handler module
The scholar_flux.api.rate_limiting.retry_handler implements a basic RetryHandler for dynamic request throttling.
This implementation defines a variable period of time to wait in between successive unsuccessful requests to the same provider.
This class is implemented by default within the SearchCoordinator class to verify and retry each request until successful or the maximum retry limit has been reached.
- class scholar_flux.api.rate_limiting.retry_handler.RetryHandler(max_retries: int = 3, backoff_factor: float = 0.5, max_backoff: int | float = 120, retry_statuses: set[int] | list[int] | None = None, raise_on_error: bool | None = None, min_retry_delay: int | float | None = None)[source]
Bases:
objectCore class used to send and dynamically retry failed requests with exponential backoff.
The RetryHandler automatically handles HTTP errors (429, 500, 501, 502, 503, 504) by retrying failed requests with increasing delays between attempts. Additional status codes can be added to the retry set via RetryHandler.DEFAULT_RETRY_STATUSES.add(<status_code>).
- Features:
Exponential backoff with configurable parameters
Respects Retry-After headers when provided
Thread-safe history tracking of all retry attempts
Configurable maximum retries and timeout limits
Example
>>> from scholar_flux import SearchCoordinator >>> coordinator = SearchCoordinator(query="nutrition", provider_name="plos") >>> # Configure retry behavior >>> coordinator.retry_handler.max_retries = 5 >>> coordinator.retry_handler.backoff_factor = 1.0 >>> # The history is stored at the class level >>> coordinator.retry_handler.history.clear_history() >>> # Execute search with automatic retries >>> result = coordinator.search_page(page=1) >>> # Access retry statistics >>> print(f"Retry attempts: {len(coordinator.retry_handler.history)}")
- max_retries
Maximum number of retry attempts (default: 3)
- Type:
int
- backoff_factor
The multiplier used for exponential backoff (default: 0.5)
- Type:
float
- max_backoff
Maximum delay between retries in seconds (default: 120). Also enforced as a hard ceiling for server-requested delays via Retry-After headers.
- Type:
float
- retry_statuses
HTTP status codes that trigger retries (default: {429, 500, 501, 502, 503, 504})
- Type:
set
- history
Thread-safe storage of all retry attempts
- Type:
HistoryDeque
Note
The retry handler is automatically used by SearchCoordinator for all requests. Each parameter is adjusted dynamically based on the provider. No manual intervention is required for basic usage.
If too many requests are sent to a single server within a specific time interval, it may return a 429 Too Many Requests error and indicate the delay that should be respected before sending another request. If the class attribute, RAISE_ON_DELAY_EXCEEDED is True (default), a RetryAfterDelayExceededException is raised. To turn this feature off, either set the max_backoff parameter directly or set RetryHandler.RAISE_ON_DELAY_EXCEEDED=False to wait the full interval upon receiving a Retry-After header.
For observability, request information, delays, and response statuses are recorded in the RetryHandler.history class attribute for later inspection and can be referenced to help modify the rate limiting configuration when needed.
- DEFAULT_RAISE_ON_ERROR = False
- DEFAULT_RETRY_AFTER_HEADERS = ('retry-after', 'x-ratelimit-retry-after')
- DEFAULT_RETRY_STATUSES = {429, 500, 501, 502, 503, 504}
- DEFAULT_VALID_STATUSES = {200}
- RAISE_ON_DELAY_EXCEEDED: bool = True
- __init__(max_retries: int = 3, backoff_factor: float = 0.5, max_backoff: int | float = 120, retry_statuses: set[int] | list[int] | None = None, raise_on_error: bool | None = None, min_retry_delay: int | float | None = None) None[source]
Initializes the RetryHandler with configurable parameters for dynamically throttling successive requests.
- Parameters:
max_retries (int) – Indicates how many attempts should be performed before halting retries at retrieving a valid response.
backoff_factor (float) – Indicates the factor used to adjust when the next request is should be attempted based on past unsuccessful attempts.
max_backoff (int | float) – Describes the maximum number of seconds to wait before submitting the next request.
retry_statuses (Optional[set[int]]) – Indicates the full list of status codes that should be retried if encountered.
raise_on_error (Optional[bool]) – A flag that indicates whether or not to raise an error upon encountering an invalid status_code or exception.
min_retry_delay (Optional[int | float]) – The minimum delay in seconds between requests.
Note
The class-level history deque is a design choice. While RateLimiter instances are designed to be stateless. This attribute enables class-level monitoring and allows introspection into how request delays are computed. The HistoryDeque is thread-safe (uses cpython on the backend) and allows global observability which is helpful for debugging, especially in cases where you need to adjust the total amount of requests sent within a given interval to avoid 429 errors.
- calculate_retry_delay(attempt_count: int, response: Response | ResponseProtocol | None = None, min_retry_delay: int | float | None = None, backoff_factor: int | float | None = None, max_backoff: int | float | None = None) int | float[source]
Calculates the delay in seconds to wait before the next retry attempt.
- Parameters:
attempt_count (int) – The number of attempts made so far.
response (Optional[requests.Response | ResponseProtocol]) – The response object from the last attempt.
min_retry_delay (Optional[int | float]) – The minimum delay in seconds between requests.
backoff_factor (Optional[int | float]) – The factor used to adjust the delay.
max_backoff (Optional[int | float]) – The maximum delay in seconds between requests.
- Returns:
The delay in seconds for the next retry attempt.
- Return type:
int | float
- delay_exceeds_max_backoff(delay: int | float | None, max_backoff: int | float | None = None, *, error_message: str | None = None, warning_message: str | None = None, response: Response | ResponseProtocol | None = None, verbose: bool = True) bool[source]
Helper method for identifying and handling scenarios where an API-requested delay exceeds max_backoff.
This method centralizes the logic for the identification and handling of delays that exceed the user-defined maximum duration to wait in-between requests. The RetryHandler is structured to cap calculated, wait times using the max_backoff attribute, but Retry-After fields are the one scenario where API-mandated request delays can exceed max_backoff.
This helper method is designed to:
Raise an exception when delay > max_backoff and RetryHandler.RAISE_ON_DELAY_EXCEEDED is True
Log a warning message when delay > max_backoff and returns True (indicating an excessive delay)
Return False when delay is None or delay < max_backoff
- Parameters:
delay (Optional[int | float]) – The delay in seconds to verify against the max_backoff.
max_backoff (Optional[int |float]) – The maximum allowable delay. Defaults to self.max_backoff if not provided.
error_message (Optional[str]) – A Custom message to provide to the RetryAfterDelayExceededException. If None, this method raises the default error message indicating the server-requested delay.
warning_message (Optional[str]) – A Custom message logged when RAISE_ON_DELAY_EXCEEDED is False. If None, this method logs a warning to indicate that the RetryHandler will otherwise wait the full duration.
response (Optional[requests.Response | ResponseProtocol]) – The response object to add as additional context to the raised exception.
verbose (bool) – A flag for logging a default/custom warning when the delay exceeds the maximum duration and the RAISE_ON_DELAY_EXCEEDED flag is false.
- Returns:
True when the delay exceeds the maximum allowable delay and False otherwise.
- Return type:
bool
- Raises:
RetryAfterDelayExceededException – When the API-requested delay exceeds the max_backoff and the RAISE_ON_DELAY_EXCEEDED flag is True.
- execute_with_retry(request_func: Callable[[...], ResponseLike], validator_func: Callable | None = None, sleep_func: Callable[[float], None] | None = None, *args: Any, backoff_factor: int | float | None = None, max_backoff: int | float | None = None, min_retry_delay: int | float | None = None, **kwargs: Any) ResponseLike | None[source]
Sends a request and retries on failure based on predefined criteria and validation function.
- Parameters:
request_func (Callable) – The function to send the request.
validator_func (Optional[Callable]) – A function that takes a response and returns True if valid.
sleep_func (Optional[Callable[[float], None]]) – An optional function used for blocking the next request until a specified duration has passed.
*args – Positional arguments for the request function.
backoff_factor (Optional[int | float]) – Indicates the factor used to adjust when the next request is should be attempted based on past unsuccessful attempts.
max_backoff (Optional[int | float]) – Describes the maximum number of seconds to wait before submitting the next request.
min_retry_delay (Optional[int | float]) – The minimum delay in seconds between requests.
**kwargs – Arbitrary keyword arguments for the request function.
- Returns:
The returned response-like object, when successful, or None if no valid response was obtained.
- Return type:
Optional[requests.Response | ResponseProtocol]
- Raises:
RequestFailedException – When a request raises an exception for whatever reason.
TimeoutError – When a request times out during response retrieval.
InvalidResponseException – When the number of retries has been exceeded and self.raise_on_error is True.
RetryAfterDelayExceededException – When the Retry-After delay requested from the server exceeds max_backoff
Note
If a Retry-After header exceeds the max_backoff and RetryHandler.RAISE_ON_DELAY_EXCEEDED=True, the exception will be raised immediately and halt the series of retry attempts.
Also note that response objects can be extracted from handled InvalidResponseException or RetryAfterDelayExceededException classes, to extract the raw response, handle it with a try/except block and extract it from the response attribute:
Example
>>> from scholar_flux.api.rate_limiting.retry_handler import RetryHandler >>> from scholar_flux.exceptions import InvalidResponseException, RetryAfterDelayExceededException >>> import requests >>> retry_handler = RetryHandler(raise_on_error=True) >>> try: ... response = retry_handler.execute_with_retry(requests.get, url="https://httpbin.org/status/200") ... except (RetryAfterDelayExceededException, InvalidResponseException) as e: ... response = e.response >>> print(response)
- classmethod extract_retry_after(headers: Mapping[str, Any] | None, keys: tuple | None = None) str | None[source]
Extracts the `retry-after field from dictionary headers if the field exists.
- Parameters:
headers (Optional[Mapping[str, Any]]) – A headers dictionary or mapping to extract the retry-after field from
keys (Optional[tuple]) – The keys to look for in the headers. (case insensitive)
- Returns:
The retry-after field value, or None if not present.
- Return type:
Optional[str]
- classmethod extract_retry_after_from_response(response: Response | ResponseProtocol | None) str | None[source]
Extracts and parses retry-after delay from any response type.
This method handles both raw responses (Response/ResponseProtocol) and processed responses (ProcessedResponse/ErrorResponse), making it the single entry point for retry-after extraction.
- Parameters:
response (Optional[requests.Response | ResponseProtocol]) – Any response object with headers
- Returns:
The unparsed retry-after header in seconds, or None if not present
- Return type:
Optional[str]
- classmethod get_retry_after(response: Response | ResponseProtocol | None) int | float | None[source]
Calculates the time that must elapse before the next request is sent according to the headers.
- Parameters:
response (requests.Response | ResponseProtocol) – The response object from the last attempt.
- Returns:
Indicates the number of seconds that must elapse before the next request is sent.
- Return type:
Optional[float]
- history: HistoryDeque[RetryAttempt] = HistoryDeque([])
- log_retry_attempt(delay: float, status_code: int | None = None) None[source]
Log an attempt to retry a request.
- Parameters:
delay (float) – The delay in seconds before the next retry attempt.
status_code (Optional[int]) – The status code of the response that triggered the retry.
- static log_retry_warning(message: str) None[source]
Log a warning when retries are exhausted or an error occurs.
- Parameters:
message (str) – The warning message to log.
- classmethod parse_retry_after(retry_after: str | None) int | float | None[source]
Parse the ‘Retry-After’ header to calculate delay.
- Parameters:
retry_after (str) – The value of ‘Retry-After’ header.
- Returns:
The total delay in seconds parsed from the response field if available.
- Return type:
Optional[int | float]
- classmethod resize_history(maxlen: int) None[source]
Resize the global history deque, preserving existing records up to the new limit.
- Parameters:
maxlen (int) – The new maximum length of the history deque.
- should_retry(response: Response | ResponseProtocol) bool[source]
Determine whether the request should be retried.
scholar_flux.api.rate_limiting.threaded_rate_limiter module
The scholar_flux.api.rate_limiting.threaded_rate_limiter module implements ThreadedRateLimiter for thread safety.
The ThreadedRateLimiter extends the basic functionality of the original RateLimiter class and can be used in multithreaded scenarios to ensure that provider rate limits are not exceeded within a constant time interval.
This implementation provides thread-safe access to rate limiting functionality through the use of reentrant locks, making it suitable for use in concurrent environments where multiple threads may access the same rate limiter instance.
- class scholar_flux.api.rate_limiting.threaded_rate_limiter.ThreadedRateLimiter(min_interval: int | float | None = None)[source]
Bases:
RateLimiterThread-safe version of RateLimiter that can be safely used across multiple threads.
Inherits all functionality from RateLimiter but adds thread synchronization to prevent race conditions when multiple threads access the same limiter instance.
- __init__(min_interval: int | float | None = None) None[source]
Initializes a new ThreadedRateLimiter with thread safety.
- Parameters:
min_interval (Optional[float | int]) – The default minimum interval to wait. Uses default if None
- rate(min_interval: float | int, metadata: Dict[str, Any] | None = None) Iterator[Self][source]
Thread-safe version of .rate context manager.
- Parameters:
min_interval (float | int) – The minimum interval to temporarily use during the call
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- Yields:
Self – The rate limiter with temporarily changed interval
- sleep(interval: int | float | None = None, metadata: Dict[str, Any] | None = None) None[source]
Thread-safe version of .sleep that prevents race conditions.
This method provides thread-safe access to the sleep functionality by acquiring the internal lock before performing the sleep operation. This ensures that the sleep duration is calculated and executed atomically.
- Parameters:
interval (Optional[float | int]) – Optional interval to sleep for. If None, uses the default interval.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- wait(min_interval: int | float | None = None, metadata: Dict[str, Any] | None = None) None[source]
Thread-safe version of the .wait method that prevents race conditions.
- Parameters:
min_interval (Optional[float | int]) – Minimum interval to wait. Uses default if None.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- wait_since(min_interval: float | int | None = None, timestamp: float | int | datetime | None = None, metadata: Dict[str, Any] | None = None) None[source]
Thread-safe method for waiting until an interval from a reference timestamp or datetime has passed.
- Parameters:
min_interval (Optional[float | int]) – Minimum interval to wait. Uses default if None.
timestamp (Optional[float | int]) – Reference time formatted as a Unix timestamp or datetime. If None, sleeps for min_interval.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
Module contents
The scholar_flux.api.rate_limiting module defines the rate-limiting behavior for all providers. The rate limiting module is designed to be relatively straightforward to apply to a variety of context and extensible to account for several varying contexts where rate limiting is required.
- Modules:
- rate_limiter:
Implements a basic rate limiter that applies rate limiting for a specified interval of time. Rate limiting directly using RateLimiter.wait or used with a context manager that records the time of execution and the amount of time to wait directly.
- threaded_rate_limiter:
Inherits from the basic RateLimiter class to account for multithreading scenarios that require the same resource. The usage is the same, but it is thread-safe.
- retry_handler:
Basic implementation that defines a period of time to wait in between requests that are unsuccessful. This class is used to automatically retry failed requests until successful or the maximum retry limit has been exceeded. The end-user can decide whether to retry specific status codes or whether to halt early.
- Classes:
- RateLimiter:
The most basic rate limiter used for throttling requests using a constant interval
- ThreadedRateLimiter:
A thread-safe implementation that inherits from the RateLimiter to apply in multithreading
- RetryHandler:
Used to define the period of time to wait before sending a failed request with applications of max backoff and backoff_factor to assist in dynamically timing requests on successive request failures.
In addition, a rate_limiter_registry and threaded_rate_limiter_registry are implemented to aid in the normalization of responses to the same provider across multiple search APIs. This is particularly relevant when using the scholar_flux.api.MultiSearchCoordinator for multi-threaded requests across queries and configurations, where the threaded_rate_limiter_registry is implemented under the hood for throttling across APIs.
Example usage:
>>> import requests
# both the RateLimiter and threaded rate limiter are implemented similarly:
>>> from scholar_flux.api.rate_limiting import ThreadedRateLimiter
>>> rate_limiter = ThreadedRateLimiter(min_interval = 3)
# defines a simple decorated function that does the equivalent of calling `rate_limiter.wait()` between requests
>>> @rate_limiter
>>> def rate_limited_request(url = 'https://httpbin.org/get'):
>>> return requests.get(url)
# the first call won't be throttled
>>> rate_limited_request()
# the second call will wait the minimum duration from the time `rate_limited_request` was last called
>>> rate_limited_request()
- class scholar_flux.api.rate_limiting.RateLimiter(min_interval: int | float | None = None)[source]
Bases:
objectA basic rate limiter used to ensure that function calls (such as API requests) do not exceed a specified rate.
The RateLimiter is used within ScholarFlux to throttle the total number of requests that can be made within a defined time interval (measured in seconds).
This class ensures that calls to RateLimiter.wait() (or any decorated function) are spaced by at least min_interval seconds.
For multithreading applications, the RateLimiter is not thread-safe. Instead, the ThreadedRateLimiter subclass can provide a thread-safe implementation when required.
- Parameters:
min_interval (Optional[float | int]) – The minimum number of seconds that must elapse before another request sent or call is performed. If min_interval is not specified, then class attribute, RateLimiter.DEFAULT_MIN_INTERVAL will be assigned to RateLimiter.min_interval instead.
Examples
>>> import requests >>> from scholar_flux.api import RateLimiter >>> rate_limiter = RateLimiter(min_interval = 5) >>> # The first call won't sleep, because a prior call using the rate limiter doesn't yet exist >>> with rate_limiter: ... response = requests.get("http://httpbin.org/get") >>> # will sleep if 5 seconds since the last call hasn't elapsed. >>> with rate_limiter: ... response = requests.get("http://httpbin.org/get") >>> # Or simply call the `wait` method directly: >>> rate_limiter.wait() >>> response = requests.get("http://httpbin.org/get")
Note
The class-level history deque is a design choice, This attribute allows class-level monitoring and introspection into how request delays are computed. The HistoryDeque is thread-safe (uses cpython on the backend) and allows global observability which is helpful for debugging, especially in cases where you need to adjust the total amount of requests sent within a given interval to avoid 429 errors.
- DEFAULT_MIN_INTERVAL: float | int = 6.1
- __init__(min_interval: int | float | None = None)[source]
Initializes the rate limiter with the min_interval argument.
- Parameters:
min_interval (Optional[float | int]) – Minimum number of seconds to wait before the next call is performed or request sent.
- default_min_interval() float | int[source]
Returns the default minimum interval for the current rate limiter.
- history: HistoryDeque[RateLimitEvent] = HistoryDeque([])
- property min_interval: float | int
The minimum number of seconds that must elapse before another request sent or action is taken.
- rate(min_interval: float | int, metadata: Dict[str, Any] | None = None) Iterator[Self][source]
Temporarily adjusts the minimum interval between function calls or requests when used with a context manager.
After the context manager exits, the original minimum interval value is then reassigned its previous value, and the time of the last call is recorded.
- Parameters:
min_interval (float | int) – Indicates the minimum interval to be temporarily used during the call
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- Yields:
RateLimiter – The original rate limiter with a temporarily changed minimum interval
- classmethod resize_history(maxlen: int) None[source]
Resize the global history deque, preserving existing records up to the new limit.
- sleep(interval: int | float | None = None, metadata: Dict[str, Any] | None = None) None[source]
Simple Instance level implementation of sleep that can be overridden when needed.
- Parameters:
interval (Optional[float | int]) – The time interval to sleep. If None, the default minimum interval for the current rate limiter is used. must be non-null, otherwise, the default min_interval value is used.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- Exceptions:
APIParameterException: Occurs if the value provided is either not an integer/float or is less than 0
- wait(min_interval: int | float | None = None, metadata: Dict[str, Any] | None = None) None[source]
Block (time.sleep) until at least min_interval has passed since last call.
This method can be used with the min_interval attribute to determine when a search was last sent and throttle requests to make sure rate limits aren’t exceeded. If not enough time has passed, the API will wait before sending the next request.
- Parameters:
min_interval (Optional[float | int]) – The minimum time to wait until another call is sent. Note that the min_interval attribute or argument must be non-null, otherwise, the default min_interval value is used.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- Exceptions:
APIParameterException: Occurs if the value provided is either not an integer/float or is less than 0
- wait_since(min_interval: int | float | None = None, timestamp: float | int | datetime | None = None, metadata: Dict[str, Any] | None = None) None[source]
Wait based on a reference timestamp or datetime.
- Parameters:
min_interval (Optional[float | int]) – Minimum interval to wait. Uses default if None.
timestamp (Optional[float | int | datetime]) – Reference time such as a Unix timestamp or datetime. If None, sleeps for min_interval.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- class scholar_flux.api.rate_limiting.RetryHandler(max_retries: int = 3, backoff_factor: float = 0.5, max_backoff: int | float = 120, retry_statuses: set[int] | list[int] | None = None, raise_on_error: bool | None = None, min_retry_delay: int | float | None = None)[source]
Bases:
objectCore class used to send and dynamically retry failed requests with exponential backoff.
The RetryHandler automatically handles HTTP errors (429, 500, 501, 502, 503, 504) by retrying failed requests with increasing delays between attempts. Additional status codes can be added to the retry set via RetryHandler.DEFAULT_RETRY_STATUSES.add(<status_code>).
- Features:
Exponential backoff with configurable parameters
Respects Retry-After headers when provided
Thread-safe history tracking of all retry attempts
Configurable maximum retries and timeout limits
Example
>>> from scholar_flux import SearchCoordinator >>> coordinator = SearchCoordinator(query="nutrition", provider_name="plos") >>> # Configure retry behavior >>> coordinator.retry_handler.max_retries = 5 >>> coordinator.retry_handler.backoff_factor = 1.0 >>> # The history is stored at the class level >>> coordinator.retry_handler.history.clear_history() >>> # Execute search with automatic retries >>> result = coordinator.search_page(page=1) >>> # Access retry statistics >>> print(f"Retry attempts: {len(coordinator.retry_handler.history)}")
- max_retries
Maximum number of retry attempts (default: 3)
- Type:
int
- backoff_factor
The multiplier used for exponential backoff (default: 0.5)
- Type:
float
- max_backoff
Maximum delay between retries in seconds (default: 120). Also enforced as a hard ceiling for server-requested delays via Retry-After headers.
- Type:
float
- retry_statuses
HTTP status codes that trigger retries (default: {429, 500, 501, 502, 503, 504})
- Type:
set
- history
Thread-safe storage of all retry attempts
- Type:
HistoryDeque
Note
The retry handler is automatically used by SearchCoordinator for all requests. Each parameter is adjusted dynamically based on the provider. No manual intervention is required for basic usage.
If too many requests are sent to a single server within a specific time interval, it may return a 429 Too Many Requests error and indicate the delay that should be respected before sending another request. If the class attribute, RAISE_ON_DELAY_EXCEEDED is True (default), a RetryAfterDelayExceededException is raised. To turn this feature off, either set the max_backoff parameter directly or set RetryHandler.RAISE_ON_DELAY_EXCEEDED=False to wait the full interval upon receiving a Retry-After header.
For observability, request information, delays, and response statuses are recorded in the RetryHandler.history class attribute for later inspection and can be referenced to help modify the rate limiting configuration when needed.
- DEFAULT_RAISE_ON_ERROR = False
- DEFAULT_RETRY_AFTER_HEADERS = ('retry-after', 'x-ratelimit-retry-after')
- DEFAULT_RETRY_STATUSES = {429, 500, 501, 502, 503, 504}
- DEFAULT_VALID_STATUSES = {200}
- RAISE_ON_DELAY_EXCEEDED: bool = True
- __init__(max_retries: int = 3, backoff_factor: float = 0.5, max_backoff: int | float = 120, retry_statuses: set[int] | list[int] | None = None, raise_on_error: bool | None = None, min_retry_delay: int | float | None = None) None[source]
Initializes the RetryHandler with configurable parameters for dynamically throttling successive requests.
- Parameters:
max_retries (int) – Indicates how many attempts should be performed before halting retries at retrieving a valid response.
backoff_factor (float) – Indicates the factor used to adjust when the next request is should be attempted based on past unsuccessful attempts.
max_backoff (int | float) – Describes the maximum number of seconds to wait before submitting the next request.
retry_statuses (Optional[set[int]]) – Indicates the full list of status codes that should be retried if encountered.
raise_on_error (Optional[bool]) – A flag that indicates whether or not to raise an error upon encountering an invalid status_code or exception.
min_retry_delay (Optional[int | float]) – The minimum delay in seconds between requests.
Note
The class-level history deque is a design choice. While RateLimiter instances are designed to be stateless. This attribute enables class-level monitoring and allows introspection into how request delays are computed. The HistoryDeque is thread-safe (uses cpython on the backend) and allows global observability which is helpful for debugging, especially in cases where you need to adjust the total amount of requests sent within a given interval to avoid 429 errors.
- calculate_retry_delay(attempt_count: int, response: Response | ResponseProtocol | None = None, min_retry_delay: int | float | None = None, backoff_factor: int | float | None = None, max_backoff: int | float | None = None) int | float[source]
Calculates the delay in seconds to wait before the next retry attempt.
- Parameters:
attempt_count (int) – The number of attempts made so far.
response (Optional[requests.Response | ResponseProtocol]) – The response object from the last attempt.
min_retry_delay (Optional[int | float]) – The minimum delay in seconds between requests.
backoff_factor (Optional[int | float]) – The factor used to adjust the delay.
max_backoff (Optional[int | float]) – The maximum delay in seconds between requests.
- Returns:
The delay in seconds for the next retry attempt.
- Return type:
int | float
- delay_exceeds_max_backoff(delay: int | float | None, max_backoff: int | float | None = None, *, error_message: str | None = None, warning_message: str | None = None, response: Response | ResponseProtocol | None = None, verbose: bool = True) bool[source]
Helper method for identifying and handling scenarios where an API-requested delay exceeds max_backoff.
This method centralizes the logic for the identification and handling of delays that exceed the user-defined maximum duration to wait in-between requests. The RetryHandler is structured to cap calculated, wait times using the max_backoff attribute, but Retry-After fields are the one scenario where API-mandated request delays can exceed max_backoff.
This helper method is designed to:
Raise an exception when delay > max_backoff and RetryHandler.RAISE_ON_DELAY_EXCEEDED is True
Log a warning message when delay > max_backoff and returns True (indicating an excessive delay)
Return False when delay is None or delay < max_backoff
- Parameters:
delay (Optional[int | float]) – The delay in seconds to verify against the max_backoff.
max_backoff (Optional[int |float]) – The maximum allowable delay. Defaults to self.max_backoff if not provided.
error_message (Optional[str]) – A Custom message to provide to the RetryAfterDelayExceededException. If None, this method raises the default error message indicating the server-requested delay.
warning_message (Optional[str]) – A Custom message logged when RAISE_ON_DELAY_EXCEEDED is False. If None, this method logs a warning to indicate that the RetryHandler will otherwise wait the full duration.
response (Optional[requests.Response | ResponseProtocol]) – The response object to add as additional context to the raised exception.
verbose (bool) – A flag for logging a default/custom warning when the delay exceeds the maximum duration and the RAISE_ON_DELAY_EXCEEDED flag is false.
- Returns:
True when the delay exceeds the maximum allowable delay and False otherwise.
- Return type:
bool
- Raises:
RetryAfterDelayExceededException – When the API-requested delay exceeds the max_backoff and the RAISE_ON_DELAY_EXCEEDED flag is True.
- execute_with_retry(request_func: Callable[[...], ResponseLike], validator_func: Callable | None = None, sleep_func: Callable[[float], None] | None = None, *args: Any, backoff_factor: int | float | None = None, max_backoff: int | float | None = None, min_retry_delay: int | float | None = None, **kwargs: Any) ResponseLike | None[source]
Sends a request and retries on failure based on predefined criteria and validation function.
- Parameters:
request_func (Callable) – The function to send the request.
validator_func (Optional[Callable]) – A function that takes a response and returns True if valid.
sleep_func (Optional[Callable[[float], None]]) – An optional function used for blocking the next request until a specified duration has passed.
*args – Positional arguments for the request function.
backoff_factor (Optional[int | float]) – Indicates the factor used to adjust when the next request is should be attempted based on past unsuccessful attempts.
max_backoff (Optional[int | float]) – Describes the maximum number of seconds to wait before submitting the next request.
min_retry_delay (Optional[int | float]) – The minimum delay in seconds between requests.
**kwargs – Arbitrary keyword arguments for the request function.
- Returns:
The returned response-like object, when successful, or None if no valid response was obtained.
- Return type:
Optional[requests.Response | ResponseProtocol]
- Raises:
RequestFailedException – When a request raises an exception for whatever reason.
TimeoutError – When a request times out during response retrieval.
InvalidResponseException – When the number of retries has been exceeded and self.raise_on_error is True.
RetryAfterDelayExceededException – When the Retry-After delay requested from the server exceeds max_backoff
Note
If a Retry-After header exceeds the max_backoff and RetryHandler.RAISE_ON_DELAY_EXCEEDED=True, the exception will be raised immediately and halt the series of retry attempts.
Also note that response objects can be extracted from handled InvalidResponseException or RetryAfterDelayExceededException classes, to extract the raw response, handle it with a try/except block and extract it from the response attribute:
Example
>>> from scholar_flux.api.rate_limiting.retry_handler import RetryHandler >>> from scholar_flux.exceptions import InvalidResponseException, RetryAfterDelayExceededException >>> import requests >>> retry_handler = RetryHandler(raise_on_error=True) >>> try: ... response = retry_handler.execute_with_retry(requests.get, url="https://httpbin.org/status/200") ... except (RetryAfterDelayExceededException, InvalidResponseException) as e: ... response = e.response >>> print(response)
- classmethod extract_retry_after(headers: Mapping[str, Any] | None, keys: tuple | None = None) str | None[source]
Extracts the `retry-after field from dictionary headers if the field exists.
- Parameters:
headers (Optional[Mapping[str, Any]]) – A headers dictionary or mapping to extract the retry-after field from
keys (Optional[tuple]) – The keys to look for in the headers. (case insensitive)
- Returns:
The retry-after field value, or None if not present.
- Return type:
Optional[str]
- classmethod extract_retry_after_from_response(response: Response | ResponseProtocol | None) str | None[source]
Extracts and parses retry-after delay from any response type.
This method handles both raw responses (Response/ResponseProtocol) and processed responses (ProcessedResponse/ErrorResponse), making it the single entry point for retry-after extraction.
- Parameters:
response (Optional[requests.Response | ResponseProtocol]) – Any response object with headers
- Returns:
The unparsed retry-after header in seconds, or None if not present
- Return type:
Optional[str]
- classmethod get_retry_after(response: Response | ResponseProtocol | None) int | float | None[source]
Calculates the time that must elapse before the next request is sent according to the headers.
- Parameters:
response (requests.Response | ResponseProtocol) – The response object from the last attempt.
- Returns:
Indicates the number of seconds that must elapse before the next request is sent.
- Return type:
Optional[float]
- history: HistoryDeque[RetryAttempt] = HistoryDeque([])
- log_retry_attempt(delay: float, status_code: int | None = None) None[source]
Log an attempt to retry a request.
- Parameters:
delay (float) – The delay in seconds before the next retry attempt.
status_code (Optional[int]) – The status code of the response that triggered the retry.
- static log_retry_warning(message: str) None[source]
Log a warning when retries are exhausted or an error occurs.
- Parameters:
message (str) – The warning message to log.
- classmethod parse_retry_after(retry_after: str | None) int | float | None[source]
Parse the ‘Retry-After’ header to calculate delay.
- Parameters:
retry_after (str) – The value of ‘Retry-After’ header.
- Returns:
The total delay in seconds parsed from the response field if available.
- Return type:
Optional[int | float]
- classmethod resize_history(maxlen: int) None[source]
Resize the global history deque, preserving existing records up to the new limit.
- Parameters:
maxlen (int) – The new maximum length of the history deque.
- should_retry(response: Response | ResponseProtocol) bool[source]
Determine whether the request should be retried.
- class scholar_flux.api.rate_limiting.ThreadedRateLimiter(min_interval: int | float | None = None)[source]
Bases:
RateLimiterThread-safe version of RateLimiter that can be safely used across multiple threads.
Inherits all functionality from RateLimiter but adds thread synchronization to prevent race conditions when multiple threads access the same limiter instance.
- __init__(min_interval: int | float | None = None) None[source]
Initializes a new ThreadedRateLimiter with thread safety.
- Parameters:
min_interval (Optional[float | int]) – The default minimum interval to wait. Uses default if None
- rate(min_interval: float | int, metadata: Dict[str, Any] | None = None) Iterator[Self][source]
Thread-safe version of .rate context manager.
- Parameters:
min_interval (float | int) – The minimum interval to temporarily use during the call
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- Yields:
Self – The rate limiter with temporarily changed interval
- sleep(interval: int | float | None = None, metadata: Dict[str, Any] | None = None) None[source]
Thread-safe version of .sleep that prevents race conditions.
This method provides thread-safe access to the sleep functionality by acquiring the internal lock before performing the sleep operation. This ensures that the sleep duration is calculated and executed atomically.
- Parameters:
interval (Optional[float | int]) – Optional interval to sleep for. If None, uses the default interval.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- wait(min_interval: int | float | None = None, metadata: Dict[str, Any] | None = None) None[source]
Thread-safe version of the .wait method that prevents race conditions.
- Parameters:
min_interval (Optional[float | int]) – Minimum interval to wait. Uses default if None.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).
- wait_since(min_interval: float | int | None = None, timestamp: float | int | datetime | None = None, metadata: Dict[str, Any] | None = None) None[source]
Thread-safe method for waiting until an interval from a reference timestamp or datetime has passed.
- Parameters:
min_interval (Optional[float | int]) – Minimum interval to wait. Uses default if None.
timestamp (Optional[float | int]) – Reference time formatted as a Unix timestamp or datetime. If None, sleeps for min_interval.
metadata (Optional[Dict[str, Any]]) – Optional metadata for observability (e.g., url, caller, reason).