Overview

Key Features

  • Thread-per-provider execution: Each provider runs in its own thread

  • Shared rate limiters: Multiple queries to the same provider coordinate automatically

  • Unified result handling: SearchResultList provides filtering, aggregation, and normalization

  • Graceful error handling: Individual provider failures don’t stop the entire search

Quick Start

Basic Example: Four Providers

Query four providers concurrently and retrieve results:

from scholar_flux import SearchCoordinator, MultiSearchCoordinator

# Create multi-coordinator instance (calls .add_coordinators() under the hood)
multi_search_coordinator = MultiSearchCoordinator.from_coordinators([
    SearchCoordinator(query="machine learning", provider_name='plos'),
    SearchCoordinator(query="machine learning", provider_name='arxiv'),
    SearchCoordinator(query="machine learning", provider_name='openalex'),
    SearchCoordinator(query="machine learning", provider_name='crossref')
])

# Execute concurrent search across 10 pages per provider
results = multi_search_coordinator.search_pages(pages=range(1, 11))

# Check results
print(f"Total results: {len(results)}")  # 40 (4 providers × 10 pages)
print(f"Successful: {len(results.filter())}/{len(results)}")

Expected output:

Total results: 40
Successful: 40/40

What Just Happened?

  1. Created coordinators: Each SearchCoordinator configures a provider with query settings

  2. Concurrent execution: search_pages spawned 4 threads (one per provider)

  3. Rate limiting: Each thread respected its provider’s rate limits automatically

  4. Result collection: All 40 responses (4 providers × 10 pages) returned as SearchResultList

Complete Example: Normalized Data

Retrieve records from multiple providers and convert to a pandas DataFrame:

import pandas as pd
from scholar_flux import SearchCoordinator, MultiSearchCoordinator

# Create and configure multi-coordinator
multi_search_coordinator = MultiSearchCoordinator.from_coordinators([
    SearchCoordinator(query="machine learning", provider_name=provider, use_cache=True)
    for provider in ['plos', 'arxiv', 'openalex', 'crossref']
])

# Retrieve 10 pages from each provider
results = multi_search_coordinator.search_pages(pages=range(1, 11))

# Filter successful responses and normalize to universal schema
normalized_records = results.filter().normalize()

# Convert to DataFrame for analysis
df = pd.DataFrame(normalized_records)
print(f"Total records: {df.shape[0]}")
print(f"Columns: {list(df.columns[:5])}...")  # First 5 columns

Expected output:

Total records: 1250
Columns: ['provider_name', 'doi', 'url', 'record_id', 'title']...

Record counts by provider:

  • PLOS: 50 records/page × 10 pages = 500 records

  • arXiv: 25 records/page × 10 pages = 250 records

  • OpenAlex: 25 records/page × 10 pages = 250 records

  • Crossref: 25 records/page × 10 pages = 250 records

  • Total: 1,250 records

Record Count-Based Retrieval:

To instead retrieve a minimum of 250 records per provider:

import pandas as pd
from scholar_flux import SearchCoordinator, MultiSearchCoordinator

# Create and configure multi-coordinator
multi_search_coordinator = MultiSearchCoordinator.from_coordinators([
    SearchCoordinator(query="machine learning", provider_name=provider, use_cache=True)
    for provider in ['plos', 'arxiv', 'openalex', 'crossref']
])

# Retrieve a minimum of 250 records per provider
results = multi_search_coordinator.search_records(min_records=250)

# Filter successful responses and normalize using the same steps as before:
normalized_records = results.filter().normalize()
df = pd.DataFrame(normalized_records)

# Preview a sample of all retrieved records
print(f"Preview:")
print(df[['provider_name', 'url', 'title', 'abstract']].sample(10))
print(f"Total records: {df.shape[0]}")

Understanding Multi-Provider Architecture

Thread-Per-Provider Model

ScholarFlux uses a sophisticated threading architecture:

MultiSearchCoordinator
├── Thread 1: PLOS
│   ├── Page 1 request → wait 6.1s
│   ├── Page 2 request → wait 6.1s
│   └── Page 3 request
├── Thread 2: arXiv
│   ├── Page 1 request → wait 4.0s
│   ├── Page 2 request → wait 4.0s
│   └── Page 3 request
├── Thread 3: OpenAlex (similar pattern)
└── Thread 4: Crossref (similar pattern)

Key characteristics:

  • Each provider runs in its own thread for true parallelism

  • Results stream back as they complete (no waiting for all)

  • Memory-efficient generator-based design

  • Provider failures are isolated

Shared Rate Limiting

When multiple queries target the same provider, they automatically share a rate limiter:

from scholar_flux import SearchCoordinator, MultiSearchCoordinator

multi_search_coordinator = MultiSearchCoordinator.from_coordinators([
    SearchCoordinator(query="gene therapy", provider_name='plos'),
    SearchCoordinator(query="CRISPR", provider_name='plos'),
    SearchCoordinator(query="immunotherapy", provider_name='plos')
])

# All three queries share PLOS's rate limiter (6.1s between requests)
# Requests are automatically coordinated:
#   Query 1, Page 1 at t=0
#   Query 2, Page 1 at t=6.1
#   Query 3, Page 1 at t=12.2
results = multi_search_coordinator.search_pages(pages=range(1, 5))

Without shared rate limiting: Multiple queries could violate provider rate limits and trigger IP bans.

With shared rate limiting: ScholarFlux coordinates all requests to the same provider, ensuring compliance.

Working with Results

SearchResultList Basics

The SearchResultList class provides methods for filtering, aggregating, and normalizing multi-provider results:

# After executing a multi-provider search
results = multi_search_coordinator.search_pages(pages=range(1, 6))

# Check total results
print(f"Total results: {len(results)}")

# Access individual SearchResult
first_result = results[0]
print(f"Provider: {first_result.provider_name}")
print(f"Page: {first_result.page}")
print(f"Record count: {first_result.record_count}")

# Check if result is successful
if first_result:  # ProcessedResponse is truthy
    print(f"Success! Retrieved {len(first_result.data)} records")
else:
    print(f"Failed: {first_result.error} - {first_result.message}")

Filtering Successful Responses

Remove failed requests to work only with successful data:

# Filter keeps only ProcessedResponse (successful) results
successful_results = results.filter()
print(f"Success rate: {len(successful_results)}/{len(results)}")

# Invert filter to get only failures
failed_results = results.filter(invert=True)
for failure in failed_results:
    print(f"Failed: {failure.provider_name} page {failure.page}")
    print(f"Error: {failure.error} - {failure.message}")

Aggregating Records

Combine all records from multiple providers into a single list:

# Method 1: Use .join() to get all processed records
all_records = results.filter().join()
print(f"Total records: {len(all_records)}")

# Method 2: Include metadata fields (provider_name, page, query)
records_with_metadata = results.filter().join(
    include={'provider_name', 'page'}
)

# Each record now has provider_name and page
print(records_with_metadata[0].keys())
# dict_keys(['title', 'abstract', 'doi', ..., 'provider_name', 'page'])

Normalizing Fields

ScholarFlux normalizes provider-specific fields to a universal schema. For detailed information on field mapping, see Schema Normalization.

Quick normalization:

# Normalize all records to universal field names
normalized_records = results.filter().normalize()

# Each record now has standardized field names
for record in normalized_records[:3]:
    print(f"Title: {record.get('title')}")
    print(f"DOI: {record.get('doi')}")
    print(f"Authors: {record.get('authors')}")
    print(f"Provider: {record.get('provider_name')}")
    print("---")

Include metadata during normalization:

# Add provider_name, page, and query to each normalized record
normalized = results.filter().normalize(
    include={'provider_name', 'page', 'query'}
)

Alternative: Normalize during search:

# Normalize records automatically during retrieval
results = multi_search_coordinator.search_pages(pages=range(1, 3), normalize_records=True)

# Access normalized records directly
for result in results.filter():
    if result.normalized_records:
        for record in result.normalized_records:
            print(record['title'])

See also

For detailed information on field normalization, provider-specific mappings, and custom field maps, see Schema Normalization.

Rate Limiting

Default Rate Limits

ScholarFlux implements conservative rate limits for each provider:

Provider

Delay Between Requests

PLOS

6.1 seconds

arXiv

4.0 seconds

OpenAlex

1.0 seconds

PubMed

2.0 seconds

Crossref

1.0 seconds

CORE

10.0 seconds

Springer Nature

2.0 seconds

Rate limiting happens automatically. You don’t need to configure anything for standard usage.

Inspecting Rate Limits

View current rate limiter settings:

from scholar_flux.api.rate_limiting import threaded_rate_limiter_registry

# View all provider rate limiters
for provider, limiter in threaded_rate_limiter_registry.items():
    print(f"{provider}: {limiter.min_interval}s between requests")

Warning

Only modify rate limits if you have explicit permission from the provider, institutional access, or documentation confirming higher limits. Violating rate limits may result in IP bans.

Real-World Example: Systematic Literature Review

This example demonstrates a comprehensive search across six providers for a systematic review:

import pandas as pd
from scholar_flux import SearchCoordinator, MultiSearchCoordinator

# Configure search across all major providers
providers = ['pubmed', 'plos', 'arxiv', 'crossref', 'openalex', 'core']

multi_search_coordinator = MultiSearchCoordinator.from_coordinators([
    SearchCoordinator(
        query="cancer immunotherapy clinical trials",
        provider_name=provider
    )
    for provider in providers
])

# Retrieve 20 pages per provider (120 total requests)
print("Starting systematic search...")
results = multi_search_coordinator.search_pages(pages=range(1, 21))

# Check success rate
successful = results.filter()
print(f"Retrieved: {len(successful)}/{len(results)} pages successfully")

# Normalize and deduplicate by DOI
normalized_records = successful.normalize(include={'provider_name'})
df = pd.DataFrame(normalized_records)

# Deduplicate by DOI (keep first occurrence)
df_dedup = df.drop_duplicates(subset=['doi'], keep='first')

# Analysis
print(f"\nResults Summary:")
print(f"Total records: {len(df)}")
print(f"Unique records (after deduplication): {len(df_dedup)}")
print(f"\nCoverage by provider:")
print(df.groupby('provider_name').size())

# Export for analysis
df_dedup.to_csv('systematic_review_results.csv', index=False)
print("\nExported to systematic_review_results.csv")

Expected output:

Starting systematic search...
Retrieved: 118/120 pages successfully

Results Summary:
Total records: 2,450
Unique records (after deduplication): 1,823

Coverage by provider:
provider_name
arxiv         250
core          400
crossref      500
openalex      500
plos          400
pubmed        400

Exported to systematic_review_results.csv

Best Practices

Query Design

DO:

  • Use specific, targeted queries

  • Test queries on a single provider first before scaling

  • Use Boolean operators when supported (AND, OR, NOT)

  • Start with small page ranges for testing

DON’T:

  • Use overly broad queries (e.g., “science” or “health”)

  • Request excessive pages (>100 per provider) without caching

  • Ignore error responses without investigation

  • Skip validation of results

Caching Strategy

Enable caching to minimize redundant API calls:

from scholar_flux import SearchCoordinator, DataCacheManager
from scholar_flux.sessions import CachedSessionManager

# HTTP response caching (session-level)
session_manager = CachedSessionManager(backend='redis')

# Result caching (processed data)
cache_manager = DataCacheManager.with_storage('redis', 'localhost:6379')

# Apply to coordinators
coordinators = [
    SearchCoordinator(
        query="machine learning",
        provider_name=provider,
        session=session_manager.configure_session(),
        cache_manager=cache_manager
    )
    for provider in ['plos', 'arxiv', 'crossref']
]

Resource Management

Memory:

  • Process large result sets in batches

  • Use .filter().join() to aggregate efficiently

  • Clear large result lists when no longer needed

Network:

  • Enable caching to reduce API calls

  • Respect rate limits (don’t customize without permission)

  • Handle provider failures gracefully with error checking

Time estimates:

  • 4 providers × 10 pages: ~2-3 minutes

  • 4 providers × 50 pages: ~10-15 minutes

  • 6 providers × 100 pages: ~30-40 minutes

Troubleshooting

“No coordinators registered” Warning

This occurs when searching before adding coordinators:

multi_search_coordinator = MultiSearchCoordinator()
results = multi_search_coordinator.search(page=1)  # Warning: returns empty list

# Fix: add 1 or more coordinators first
multi_search_coordinator.add(SearchCoordinator(query="AI", provider_name='plos'))
results = multi_search_coordinator.search(page=1)  # Now works

Memory Issues with Large Searches

Process results in batches instead of all at once:

# Instead of: pages=range(1, 200)
batch_size = 20
all_data = []

for batch_start in range(1, 200, batch_size):
    batch_end = min(batch_start + batch_size, 200)
    batch_pages = range(batch_start, batch_end)

    results = multi_search_coordinator.search_pages(pages=batch_pages)
    batch_data = results.filter().join()
    all_data.extend(batch_data)

    # Clear memory
    del results
    print(f"Processed pages {batch_start}-{batch_end-1}")

Provider-Specific Failures

Investigate individual provider failures:

results = multi_search_coordinator.search_pages(pages=range(1, 11))

# Separate successes and failures
successful = results.filter()
failed = results.filter(invert=True)

# Analyze failures
if failed:
    print(f"{len(failed)} failures:")
    for failure in failed:
        print(f"  {failure.provider_name} page {failure.page}")
        print(f"  Error: {failure.error}")
        print(f"  Message: {failure.message}")

Common failure causes:

  • API temporary downtime (retry later)

  • Rate limit exceeded (check if limits were customized)

  • Network connectivity issues

  • Invalid API keys (for providers requiring authentication)

Next Steps

Related Guides:

Advanced Topics:

API Reference: