Multi-Provider Search
ScholarFlux enables concurrent searches across multiple academic databases with automatic rate limiting, shared thread management, and unified result handling. This guide demonstrates how to query multiple providers simultaneously and work with aggregated results.
Prerequisites
Complete Getting Started to understand SearchCoordinator basics
Understand Response Handling and Error Patterns for SearchResultList error handling
Basic familiarity with concurrent programming concepts
Overview
Why Multi-Provider Search?
Comprehensive literature reviews require querying multiple databases. The traditional sequential approach is time-consuming:
from scholar_flux import SearchCoordinator
# Create coordinators
plos = SearchCoordinator(query="machine learning", provider_name='plos')
arxiv = SearchCoordinator(query="machine learning", provider_name='arxiv')
crossref = SearchCoordinator(query="machine learning", provider_name='crossref')
# Sequential: query each provider one at a time
# 6.1 second delay × 8 waits = 48.8 seconds
plos_results = plos.search_pages(range(1, 10))
# 4 second delay × 8 waits = 32 seconds
arxiv_results = arxiv.search_pages(range(1, 10))
# 1 second delay × 8 waits = 8 seconds
crossref_results = crossref.search_pages(range(1, 10))
# Total time: ~89 seconds
ScholarFlux’s concurrent approach:
from scholar_flux import MultiSearchCoordinator
# Add all coordinators to multi-search
multi_search_coordinator = MultiSearchCoordinator()
multi_search_coordinator.add_coordinators([plos, arxiv, crossref])
# Concurrent: all providers query simultaneously
results = multi_search_coordinator.search_pages(pages=range(1, 10))
# Total time: ~49 seconds (limited by most rate-limited provider: PLOS)
For 3 providers × 9 pages, ScholarFlux is ~1.8x faster through concurrent execution with automatic rate limit coordination.
Key Features
Thread-per-provider execution: Each provider runs in its own thread
Shared rate limiters: Multiple queries to the same provider coordinate automatically
Unified result handling: SearchResultList provides filtering, aggregation, and normalization
Graceful error handling: Individual provider failures don’t stop the entire search
Quick Start
Basic Example: Four Providers
Query four providers concurrently and retrieve results:
from scholar_flux import SearchCoordinator, MultiSearchCoordinator
# Create multi-coordinator instance (calls .add_coordinators() under the hood)
multi_search_coordinator = MultiSearchCoordinator.from_coordinators([
SearchCoordinator(query="machine learning", provider_name='plos'),
SearchCoordinator(query="machine learning", provider_name='arxiv'),
SearchCoordinator(query="machine learning", provider_name='openalex'),
SearchCoordinator(query="machine learning", provider_name='crossref')
])
# Execute concurrent search across 10 pages per provider
results = multi_search_coordinator.search_pages(pages=range(1, 11))
# Check results
print(f"Total results: {len(results)}") # 40 (4 providers × 10 pages)
print(f"Successful: {len(results.filter())}/{len(results)}")
Expected output:
Total results: 40
Successful: 40/40
What Just Happened?
Created coordinators: Each SearchCoordinator configures a provider with query settings
Concurrent execution: search_pages spawned 4 threads (one per provider)
Rate limiting: Each thread respected its provider’s rate limits automatically
Result collection: All 40 responses (4 providers × 10 pages) returned as SearchResultList
Complete Example: Normalized Data
Retrieve records from multiple providers and convert to a pandas DataFrame:
import pandas as pd
from scholar_flux import SearchCoordinator, MultiSearchCoordinator
# Create and configure multi-coordinator
multi_search_coordinator = MultiSearchCoordinator.from_coordinators([
SearchCoordinator(query="machine learning", provider_name=provider, use_cache=True)
for provider in ['plos', 'arxiv', 'openalex', 'crossref']
])
# Retrieve 10 pages from each provider
results = multi_search_coordinator.search_pages(pages=range(1, 11))
# Filter successful responses and normalize to universal schema
normalized_records = results.filter().normalize()
# Convert to DataFrame for analysis
df = pd.DataFrame(normalized_records)
print(f"Total records: {df.shape[0]}")
print(f"Columns: {list(df.columns[:5])}...") # First 5 columns
Expected output:
Total records: 1250
Columns: ['provider_name', 'doi', 'url', 'record_id', 'title']...
Record counts by provider:
PLOS: 50 records/page × 10 pages = 500 records
arXiv: 25 records/page × 10 pages = 250 records
OpenAlex: 25 records/page × 10 pages = 250 records
Crossref: 25 records/page × 10 pages = 250 records
Total: 1,250 records
Record Count-Based Retrieval:
To instead retrieve a minimum of 250 records per provider:
import pandas as pd
from scholar_flux import SearchCoordinator, MultiSearchCoordinator
# Create and configure multi-coordinator
multi_search_coordinator = MultiSearchCoordinator.from_coordinators([
SearchCoordinator(query="machine learning", provider_name=provider, use_cache=True)
for provider in ['plos', 'arxiv', 'openalex', 'crossref']
])
# Retrieve a minimum of 250 records per provider
results = multi_search_coordinator.search_records(min_records=250)
# Filter successful responses and normalize using the same steps as before:
normalized_records = results.filter().normalize()
df = pd.DataFrame(normalized_records)
# Preview a sample of all retrieved records
print(f"Preview:")
print(df[['provider_name', 'url', 'title', 'abstract']].sample(10))
print(f"Total records: {df.shape[0]}")
Understanding Multi-Provider Architecture
Thread-Per-Provider Model
ScholarFlux uses a sophisticated threading architecture:
MultiSearchCoordinator
├── Thread 1: PLOS
│ ├── Page 1 request → wait 6.1s
│ ├── Page 2 request → wait 6.1s
│ └── Page 3 request
├── Thread 2: arXiv
│ ├── Page 1 request → wait 4.0s
│ ├── Page 2 request → wait 4.0s
│ └── Page 3 request
├── Thread 3: OpenAlex (similar pattern)
└── Thread 4: Crossref (similar pattern)
Key characteristics:
Each provider runs in its own thread for true parallelism
Results stream back as they complete (no waiting for all)
Memory-efficient generator-based design
Provider failures are isolated
Working with Results
SearchResultList Basics
The SearchResultList class provides methods for filtering, aggregating, and normalizing multi-provider results:
# After executing a multi-provider search
results = multi_search_coordinator.search_pages(pages=range(1, 6))
# Check total results
print(f"Total results: {len(results)}")
# Access individual SearchResult
first_result = results[0]
print(f"Provider: {first_result.provider_name}")
print(f"Page: {first_result.page}")
print(f"Record count: {first_result.record_count}")
# Check if result is successful
if first_result: # ProcessedResponse is truthy
print(f"Success! Retrieved {len(first_result.data)} records")
else:
print(f"Failed: {first_result.error} - {first_result.message}")
Filtering Successful Responses
Remove failed requests to work only with successful data:
# Filter keeps only ProcessedResponse (successful) results
successful_results = results.filter()
print(f"Success rate: {len(successful_results)}/{len(results)}")
# Invert filter to get only failures
failed_results = results.filter(invert=True)
for failure in failed_results:
print(f"Failed: {failure.provider_name} page {failure.page}")
print(f"Error: {failure.error} - {failure.message}")
Aggregating Records
Combine all records from multiple providers into a single list:
# Method 1: Use .join() to get all processed records
all_records = results.filter().join()
print(f"Total records: {len(all_records)}")
# Method 2: Include metadata fields (provider_name, page, query)
records_with_metadata = results.filter().join(
include={'provider_name', 'page'}
)
# Each record now has provider_name and page
print(records_with_metadata[0].keys())
# dict_keys(['title', 'abstract', 'doi', ..., 'provider_name', 'page'])
Normalizing Fields
ScholarFlux normalizes provider-specific fields to a universal schema. For detailed information on field mapping, see Schema Normalization.
Quick normalization:
# Normalize all records to universal field names
normalized_records = results.filter().normalize()
# Each record now has standardized field names
for record in normalized_records[:3]:
print(f"Title: {record.get('title')}")
print(f"DOI: {record.get('doi')}")
print(f"Authors: {record.get('authors')}")
print(f"Provider: {record.get('provider_name')}")
print("---")
Include metadata during normalization:
# Add provider_name, page, and query to each normalized record
normalized = results.filter().normalize(
include={'provider_name', 'page', 'query'}
)
Alternative: Normalize during search:
# Normalize records automatically during retrieval
results = multi_search_coordinator.search_pages(pages=range(1, 3), normalize_records=True)
# Access normalized records directly
for result in results.filter():
if result.normalized_records:
for record in result.normalized_records:
print(record['title'])
See also
For detailed information on field normalization, provider-specific mappings, and custom field maps, see Schema Normalization.
Rate Limiting
Default Rate Limits
ScholarFlux implements conservative rate limits for each provider:
Provider |
Delay Between Requests |
|---|---|
PLOS |
6.1 seconds |
arXiv |
4.0 seconds |
OpenAlex |
1.0 seconds |
PubMed |
2.0 seconds |
Crossref |
1.0 seconds |
CORE |
10.0 seconds |
Springer Nature |
2.0 seconds |
Rate limiting happens automatically. You don’t need to configure anything for standard usage.
Inspecting Rate Limits
View current rate limiter settings:
from scholar_flux.api.rate_limiting import threaded_rate_limiter_registry
# View all provider rate limiters
for provider, limiter in threaded_rate_limiter_registry.items():
print(f"{provider}: {limiter.min_interval}s between requests")
Warning
Only modify rate limits if you have explicit permission from the provider, institutional access, or documentation confirming higher limits. Violating rate limits may result in IP bans.
Real-World Example: Systematic Literature Review
This example demonstrates a comprehensive search across six providers for a systematic review:
import pandas as pd
from scholar_flux import SearchCoordinator, MultiSearchCoordinator
# Configure search across all major providers
providers = ['pubmed', 'plos', 'arxiv', 'crossref', 'openalex', 'core']
multi_search_coordinator = MultiSearchCoordinator.from_coordinators([
SearchCoordinator(
query="cancer immunotherapy clinical trials",
provider_name=provider
)
for provider in providers
])
# Retrieve 20 pages per provider (120 total requests)
print("Starting systematic search...")
results = multi_search_coordinator.search_pages(pages=range(1, 21))
# Check success rate
successful = results.filter()
print(f"Retrieved: {len(successful)}/{len(results)} pages successfully")
# Normalize and deduplicate by DOI
normalized_records = successful.normalize(include={'provider_name'})
df = pd.DataFrame(normalized_records)
# Deduplicate by DOI (keep first occurrence)
df_dedup = df.drop_duplicates(subset=['doi'], keep='first')
# Analysis
print(f"\nResults Summary:")
print(f"Total records: {len(df)}")
print(f"Unique records (after deduplication): {len(df_dedup)}")
print(f"\nCoverage by provider:")
print(df.groupby('provider_name').size())
# Export for analysis
df_dedup.to_csv('systematic_review_results.csv', index=False)
print("\nExported to systematic_review_results.csv")
Expected output:
Starting systematic search...
Retrieved: 118/120 pages successfully
Results Summary:
Total records: 2,450
Unique records (after deduplication): 1,823
Coverage by provider:
provider_name
arxiv 250
core 400
crossref 500
openalex 500
plos 400
pubmed 400
Exported to systematic_review_results.csv
Best Practices
Query Design
DO:
Use specific, targeted queries
Test queries on a single provider first before scaling
Use Boolean operators when supported (
AND,OR,NOT)Start with small page ranges for testing
DON’T:
Use overly broad queries (e.g., “science” or “health”)
Request excessive pages (>100 per provider) without caching
Ignore error responses without investigation
Skip validation of results
Caching Strategy
Enable caching to minimize redundant API calls:
from scholar_flux import SearchCoordinator, DataCacheManager
from scholar_flux.sessions import CachedSessionManager
# HTTP response caching (session-level)
session_manager = CachedSessionManager(backend='redis')
# Result caching (processed data)
cache_manager = DataCacheManager.with_storage('redis', 'localhost:6379')
# Apply to coordinators
coordinators = [
SearchCoordinator(
query="machine learning",
provider_name=provider,
session=session_manager.configure_session(),
cache_manager=cache_manager
)
for provider in ['plos', 'arxiv', 'crossref']
]
Resource Management
Memory:
Process large result sets in batches
Use
.filter().join()to aggregate efficientlyClear large result lists when no longer needed
Network:
Enable caching to reduce API calls
Respect rate limits (don’t customize without permission)
Handle provider failures gracefully with error checking
Time estimates:
4 providers × 10 pages: ~2-3 minutes
4 providers × 50 pages: ~10-15 minutes
6 providers × 100 pages: ~30-40 minutes
Troubleshooting
“No coordinators registered” Warning
This occurs when searching before adding coordinators:
multi_search_coordinator = MultiSearchCoordinator()
results = multi_search_coordinator.search(page=1) # Warning: returns empty list
# Fix: add 1 or more coordinators first
multi_search_coordinator.add(SearchCoordinator(query="AI", provider_name='plos'))
results = multi_search_coordinator.search(page=1) # Now works
Memory Issues with Large Searches
Process results in batches instead of all at once:
# Instead of: pages=range(1, 200)
batch_size = 20
all_data = []
for batch_start in range(1, 200, batch_size):
batch_end = min(batch_start + batch_size, 200)
batch_pages = range(batch_start, batch_end)
results = multi_search_coordinator.search_pages(pages=batch_pages)
batch_data = results.filter().join()
all_data.extend(batch_data)
# Clear memory
del results
print(f"Processed pages {batch_start}-{batch_end-1}")
Provider-Specific Failures
Investigate individual provider failures:
results = multi_search_coordinator.search_pages(pages=range(1, 11))
# Separate successes and failures
successful = results.filter()
failed = results.filter(invert=True)
# Analyze failures
if failed:
print(f"{len(failed)} failures:")
for failure in failed:
print(f" {failure.provider_name} page {failure.page}")
print(f" Error: {failure.error}")
print(f" Message: {failure.message}")
Common failure causes:
API temporary downtime (retry later)
Rate limit exceeded (check if limits were customized)
Network connectivity issues
Invalid API keys (for providers requiring authentication)
Next Steps
Related Guides:
Schema Normalization - Detailed guide on field normalization and custom mappings
Custom Providers - Add new providers to ScholarFlux
Workflows - Multi-step retrieval for complex APIs (e.g., PubMed)
Advanced Topics:
Caching Strategies - Production caching patterns with Redis, MongoDB, SQLAlchemy
Production Deployment - Deploy ScholarFlux at scale with Docker and Kubernetes
API Reference:
scholar_flux.api.MultiSearchCoordinator- Complete API documentationscholar_flux.api.SearchCoordinator- Single-provider coordinator referencescholar_flux.api.models.SearchResultList- Result list methods and propertiesscholar_flux.api.models.SearchResult- Individual result structure