Workflows

This tutorial explains how ScholarFlux workflows enable multi-step data retrieval from academic APIs, with real examples using PubMed, Crossref, and OpenAlex.

Reading Guide

New to ScholarFlux? Complete Getting Started first, then return here.

Quick start (5 min): Read “Overview” and “Built-in Workflows” to understand PubMed’s automatic workflow, then start querying.

Building custom workflows (20 min): Read through “Creating Custom Workflows” and one real-world example to learn the workflow pattern.

Advanced customization (15 min): Jump to “Best Practices” or “Advanced Customization” for extension points and production patterns.

Debugging workflow issues: See “Workflow Error Handling” and “Troubleshooting” sections.

Production deployment: After reading, see Caching Strategies for workflow caching patterns and Production Deployment for deployment configuration.

Prerequisites

Overview

What are Workflows?

Some academic APIs require multiple requests to retrieve complete article data. For example:

  • PubMed: Search for IDs → Fetch full records using those IDs

  • Crossref: Search for articles → Fetch detailed metadata using DOIs

  • OpenAlex: Search for seed papers → Fetch citation networks

ScholarFlux workflows automate these multi-step processes through the SearchWorkflow class.

Note

Most users never interact with workflows directly. PubMed’s workflow is configured automatically—just use SearchCoordinator(provider_name="pubmed") and everything works transparently. Custom workflows are only needed for specialized multi-step retrieval patterns.

Why Use Workflows?

Without workflows:

# Manual two-step process (tedious and error-prone)
# Step 1: Search PubMed for IDs
search_response = requests.get(
    'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi',
    params={'term': 'neuroscience', 'retmax': 20}
)
ids = parse_xml(search_response)['IdList']['Id']

# Step 2: Fetch records using IDs
fetch_response = requests.get(
    'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi',
    params={'id': ','.join(ids), 'retmode': 'xml'}
)
records = parse_xml(fetch_response)

With workflows:

# Automatic two-step process
coordinator = SearchCoordinator(
    query="neuroscience",
    provider_name="pubmed"  # Workflow configured automatically
)
result = coordinator.search(page=1)
# Returns complete records with abstracts and metadata

Workflow Architecture

Understanding the Data Flow

ScholarFlux workflows coordinate multiple API calls with intelligent data passing between steps:

SearchCoordinator.search_page(page=1)
├─> Creates SearchResult container (holds query, provider, page)
│
└─> Calls SearchCoordinator.search()
    ├─> Workflow enabled? → Execute workflow
    │   │
    │   ├─> Step 1: WorkflowStep._run()
    │   │   ├─> Calls coordinator._search() internally
    │   │   ├─> Returns ProcessedResponse (or ErrorResponse)
    │   │   ├─> Cached by DataCacheManager ✓
    │   │   └─> Wrapped in StepContext
    │   │
    │   ├─> Step 2: WorkflowStep.pre_transform(ctx=Step1Context)
    │   │   ├─> Extracts data from Step 1's result
    │   │   ├─> Configures parameters for Step 2
    │   │   └─> Returns modified WorkflowStep
    │   │
    │   ├─> Step 2: WorkflowStep._run()
    │   │   ├─> Calls coordinator._search() with Step 1's data
    │   │   ├─> Returns ProcessedResponse (or ErrorResponse)
    │   │   ├─> Cached by DataCacheManager ✓
    │   │   └─> Wrapped in StepContext
    │   │
    │   └─> Returns final ProcessedResponse
    │
    └─> No workflow? → Call coordinator._search() directly

Key Points:

  • Each step is cached independently by DataCacheManager

  • Errors at any step return ErrorResponse with details

  • StepContext passes results between steps without modifying coordinator

  • SearchResult (from search_page) adds query/provider/page context for user convenience

Components Explained

WorkflowStep: Defines behavior for a single step

class WorkflowStep(BaseModel):
    provider_name: str = "my_provider"
    search_parameters: dict = {}   # kwargs for _search()
    config_parameters: dict = {}   # overrides for SearchAPIConfig

    def pre_transform(self, ctx: StepContext) -> "WorkflowStep":
        """Modify step based on previous results (optional)"""

    def _run(self, step_number: int, coordinator, ctx: StepContext) -> StepContext:
        """Execute the step (required)"""

    def post_transform(self, ctx: StepContext) -> StepContext:
        """Modify results after execution (optional)"""

StepContext: Carries results between steps

@dataclass
class StepContext:
    step_number: int              # Which step this is (0, 1, 2...)
    step: WorkflowStep            # The WorkflowStep instance
    result: ProcessedResponse     # The API response (or ErrorResponse)

SearchWorkflow: Orchestrates all steps

class SearchWorkflow(BaseModel):
    steps: List[WorkflowStep]     # Steps to execute in order
    stop_on_error: bool = True    # Halt workflow on step failure?

    def _run(self, coordinator) -> WorkflowResult:
        """Execute all steps sequentially"""

Parameter Types

When customizing workflow steps, two parameter types control behavior:

1. search_parameters - Arguments passed to coordinator._search()

# passed to SearchCoordinator._search() in the current workflow on execution
search_parameters = {
    'page': 1,                    # Which page to retrieve
    'normalize_records': True,    # Apply field normalization?
    'from_cache': True            # Use cached results?
}

2. config_parameters - Overrides for SearchAPIConfig

# used to update the `SearchAPIConfig` temporarily when required
config_parameters = {
    'request_delay': 0.1,         # Rate limiting delay
    'records_per_page': 50,       # Results per page
    'id': '12345,67890',          # API-specific parameter (e.g., PubMed IDs)
    'endpoint': 'works/doi'       # API endpoint path
}

Tip

Use search_parameters to control ScholarFlux behavior. Use config_parameters to customize the API request itself.

Built-in Workflows

PubMed Search→Fetch (Automatic)

PubMed requires two API calls: eSearch (get IDs) → eFetch (get records).

from scholar_flux import SearchCoordinator

# PubMed workflow is automatically configured
coordinator = SearchCoordinator(
    query="gene therapy",
    provider_name="pubmed"
)

# Single call executes two-step workflow transparently
result = coordinator.search(page=1)

if result:
    print(f"Retrieved {len(result.data)} records")
    for record in result.data[:3]:
        print(f"Title: {record.get('MedlineCitation.Article.ArticleTitle')}")
        print(f"Abstract: {record.get('MedlineCitation.Article.Abstract.AbstractText')}")
        print()

What happens behind the scenes:

  1. Step 1 (eSearch): - Provider: pubmed - Retrieves list of PubMed IDs matching query - Returns ProcessedResponse with IDs in metadata['IdList']['Id'] - Cached

  2. Step 2 (eFetch): - Provider: pubmedefetch (different endpoint!) - Extracts IDs from Step 1’s metadata - Fetches complete records for those IDs - Returns ProcessedResponse with full articles - Cached

  3. Result: Full article metadata including abstracts, preserved search metadata

Note

Provider Switching: Workflows can use different providers for different steps. PubMed uses pubmed for eSearch and pubmedefetch for eFetch because they have different base URLs and configurations. Rate limiting is handled independently for each provider.

Tip

ScholarFlux preserves metadata from Step 1 (total results, query info) in the final response, so you get both complete article data and search context.

Custom Workflows

When to Create Custom Workflows

You need a custom workflow when:

  • ✅ An API requires multiple steps to get complete data

  • ✅ You need to filter/transform data between steps

  • ✅ You want to aggregate results from multiple searches

  • ✅ You need to use results from one search to guide the next

You DON’T need a custom workflow when:

Creating Custom Workflows

Build custom workflows by subclassing WorkflowStep:

from scholar_flux.api.workflows import WorkflowStep, SearchWorkflow, StepContext
from scholar_flux.api.models import ProcessedResponse
from scholar_flux.utils import generate_iso_timestamp
from typing import Optional

class MySearchStep(WorkflowStep):
    """Step 1: Search for records"""

    provider_name: Optional[str] = "my_provider"

class MyFetchStep(WorkflowStep):
    """Step 2: Fetch detailed data"""

    provider_name: Optional[str] = "my_provider"

    def pre_transform(
        self,
        ctx: Optional[StepContext] = None,
        *args,
        **kwargs
    ) -> "MyFetchStep":
        """Extract data from previous step to configure this step"""

        # Validate context type
        self._verify_context(ctx)

        # Extract data from Step 1
        if ctx and ctx.result:
            previous_results = ctx.result.data or []

            # Extract IDs for Step 2
            ids = [r['id'] for r in previous_results if r.get('id')]

            # Configure this step to use those IDs
            self.config_parameters = {'ids': ids}

        return self

    def _run(
        self,
        step_number: int,
        search_coordinator,
        ctx: Optional[StepContext] = None,
        *args,
        **kwargs
    ) -> StepContext:
        """Execute Step 2 using IDs from Step 1"""

        # Get IDs configured in pre_transform
        ids = self.config_parameters.get('ids', [])

        # Fetch records using IDs
        responses = [
            search_coordinator.parameter_search(id=id_value)
            for id_value in ids
        ]

        # Combine results
        combined_data = [
            record
            for response in responses if response
            for record in (response.data or [])
        ]

        # Create final response
        final_response = ProcessedResponse(
            response=responses[0].response if responses else None,
            processed_records=combined_data,
            metadata={'count': len(combined_data)},
            created_at=generate_iso_timestamp()
        )

        # Return as StepContext
        return StepContext(
            step_number=step_number,
            step=self.model_copy(),
            result=final_response
        )

# Create and use workflow
workflow = SearchWorkflow(steps=[MySearchStep(), MyFetchStep()])
coordinator = SearchCoordinator(
    query="test",
    provider_name="my_provider",
    workflow=workflow
)

result = coordinator.search(page=1)

Key Customization Points:

  1. pre_transform(): Modify step configuration based on previous results

  2. _run(): Execute the step’s main logic

  3. post_transform(): Transform results after execution (optional)

  4. with_context(): Temporarily modify coordinator for step scope (advanced)

Real-World Example 1: Crossref DOI Enrichment

This workflow searches Crossref, then fetches detailed metadata for each DOI.

Complete Implementation

from scholar_flux.api.workflows import WorkflowStep, SearchWorkflow, StepContext
from scholar_flux.api import SearchCoordinator, ProcessedResponse
from scholar_flux.api.models import SearchAPIConfig
from scholar_flux.utils.helpers import generate_iso_timestamp
from typing import Optional
from pydantic import Field


class PreprocessStep(WorkflowStep):
    """Step 1: Search Crossref for articles"""

    provider_name: Optional[str] = Field(
        default="crossref",
        description="Provider for initial search"
    )


class EnrichmentStep(WorkflowStep):
    """Step 2: Fetch detailed metadata using DOIs from Step 1"""

    provider_name: Optional[str] = Field(
        default="crossref",
        description="Provider for DOI-based retrieval"
    )

    def pre_transform(
        self,
        ctx: Optional[StepContext] = None,
        provider_name: Optional[str] = None,
        search_parameters: Optional[dict] = None,
        config_parameters: Optional[dict] = None,
        *args,
        **kwargs
    ) -> "EnrichmentStep":
        """Extract DOIs from Step 1 results"""

        # Validate we have context
        self._verify_context(ctx)

        # Get results from preprocessing step
        preprocessed_result = ctx.result.data or [] if ctx and ctx.result else []

        if not preprocessed_result:
            raise TypeError(
                f"Step 1 produced no records. Cannot continue."
            )

        # Extract DOIs
        dois = [r['DOI'] for r in preprocessed_result if r.get('DOI')]

        # Configure for DOI-based retrieval
        # Note: Using config_parameters to override SearchAPIConfig
        self.search_parameters = {"parameters": {"dois": dois}}

        return self

    def _run(
        self,
        step_number: int,
        search_coordinator,
        ctx: Optional[StepContext] = None,
        verbose: Optional[bool] = True,
        *args,
        **kwargs
    ) -> StepContext:
        """Fetch detailed metadata for each DOI"""

        # Get DOI list from pre_transform
        doi_list = self.search_parameters["parameters"]["dois"]

        # Fetch each DOI individually
        # Note: parameter_search() allows custom endpoint paths
        processed_responses = [
            search_coordinator.parameter_search(
                endpoint="works/" + doi,
                **kwargs
            )
            for doi in doi_list if doi
        ]

        # Combine all records
        combined_records = [
            record
            for response in processed_responses
            if response and response.data
            for record in response.data
        ]

        # Create final response
        final_response = ProcessedResponse(
            response=processed_responses[0].response if processed_responses else None,
            processed_records=combined_records,
            metadata={
                'total_dois': len(doi_list),
                'enriched_records': len(combined_records)
            },
            created_at=generate_iso_timestamp()
        )

        return StepContext(
            step_number=step_number,
            step=self.model_copy(),
            result=final_response
        )

Usage Example

from scholar_flux import SearchCoordinator
from scholar_flux.sessions import CachedSessionManager

# Create enrichment workflow
enrichment_workflow = SearchWorkflow(
    steps=[PreprocessStep(), EnrichmentStep()]
)

# Configure coordinator
session_manager = CachedSessionManager(
    backend="redis",
    user_agent="Research/1.0 (mailto:user@institution.edu)"
)

coordinator = SearchCoordinator(
    query="machine learning healthcare",
    provider_name="crossref",
    workflow=enrichment_workflow,
    session=session_manager()
)

# Execute workflow
result = coordinator.search(page=1)

if result:
    print(f"✅ Enriched {len(result.data)} records")
    print(f"Metadata: {result.metadata}")
else:
    print(f"❌ Error: {result.error} - {result.message}")

Note

Caching Behavior: Both Step 1 (search) and Step 2 (enrichment) results are cached independently. If you run the same query again, both steps will be retrieved from cache instantly.

Real-World Example 2: OpenAlex Citation Network

This workflow builds citation networks by: 1. Finding seed papers 2. Retrieving papers that cite them 3. Retrieving papers they cite

Complete Implementation

from scholar_flux.api.workflows import WorkflowStep, SearchWorkflow, StepContext
from scholar_flux.api import SearchCoordinator, ProcessedResponse
from scholar_flux.utils.helpers import generate_iso_timestamp
from typing import Optional
from pydantic import Field


class SeedPaperStep(WorkflowStep):
    """Step 1: Find seed papers matching the query"""

    provider_name: Optional[str] = "openalex"


class CitationStep(WorkflowStep):
    """Step 2 (and 3): Fetch citation network data"""

    provider_name: Optional[str] = "openalex"
    citation_parameter: str = Field(
        default="cited_by",
        description="'cited_by' or 'cites' for citation direction"
    )
    record_limit: Optional[int] = Field(
        default=None,
        description="Max records to fetch per seed paper"
    )

    def pre_transform(
        self,
        ctx: Optional[StepContext] = None,
        *args,
        **kwargs
    ) -> "CitationStep":
        """Extract OpenAlex IDs from previous results"""

        self._verify_context(ctx)

        # Get seed papers from previous step
        if ctx and ctx.result:
            seed_papers = ctx.result.data or []

            # Extract OpenAlex IDs
            openalex_ids = [
                p.get('id') for p in seed_papers
                if p.get('id')
            ]

            # Store for _run()
            self.search_parameters = {'openalex_ids': openalex_ids}

        return self

    def _run(
        self,
        step_number: int,
        search_coordinator,
        ctx: Optional[StepContext] = None,
        *args,
        **kwargs
    ) -> StepContext:
        """Fetch citations for each seed paper"""

        openalex_ids = self.search_parameters.get('openalex_ids', [])

        all_citations = []

        for oa_id in openalex_ids:
            # Build filter for this paper's citations
            filter_param = f"{self.citation_parameter}:{oa_id}"

            # Fetch citations
            response = search_coordinator.parameter_search(
                filter=filter_param,
                per_page=self.record_limit or 25,
                **kwargs
            )

            if response and response.data:
                all_citations.extend(response.data[:self.record_limit] if self.record_limit else response.data)

        # Deduplicate by OpenAlex ID
        seen_ids = set()
        unique_citations = []
        for citation in all_citations:
            citation_id = citation.get('id')
            if citation_id and citation_id not in seen_ids:
                seen_ids.add(citation_id)
                unique_citations.append(citation)

        # Create final response
        final_response = ProcessedResponse(
            response=None,  # Multiple API calls, no single response
            processed_records=unique_citations,
            metadata={
                'seed_papers': len(openalex_ids),
                'total_citations': len(all_citations),
                'unique_citations': len(unique_citations),
                'citation_direction': self.citation_parameter
            },
            created_at=generate_iso_timestamp()
        )

        return StepContext(
            step_number=step_number,
            step=self.model_copy(),
            result=final_response
        )


class OpenAlexCitationWorkflow(SearchWorkflow):
    """Three-step workflow for building citation networks"""

    def _create_workflow_result(
        self,
        result: Optional[ProcessedResponse] = None
    ) -> WorkflowResult:
        """Merge results from all citation steps"""

        # Use base implementation
        workflow_result = super()._create_workflow_result(result)

        # Optionally merge data from all steps
        if len(self._history) >= 3:
            seed_step = self._history[0]
            cited_by_step = self._history[1]
            cites_step = self._history[2]

            # Combine all unique papers
            all_papers = []
            seen_ids = set()

            for step_ctx in [seed_step, cited_by_step, cites_step]:
                if step_ctx.result and step_ctx.result.data:
                    for paper in step_ctx.result.data:
                        paper_id = paper.get('id')
                        if paper_id and paper_id not in seen_ids:
                            seen_ids.add(paper_id)
                            all_papers.append(paper)

            # Create merged response
            merged_response = ProcessedResponse(
                response=None,
                processed_records=all_papers,
                metadata={
                    'total_papers': len(all_papers),
                    'seed_papers': len(seed_step.result.data) if seed_step.result else 0,
                    'citing_papers': len(cited_by_step.result.data) if cited_by_step.result else 0,
                    'cited_papers': len(cites_step.result.data) if cites_step.result else 0
                },
                created_at=generate_iso_timestamp()
            )

            workflow_result.result = merged_response

        return workflow_result

Usage Example

from scholar_flux import CachedSessionManager

# Create citation network workflow
citation_workflow = OpenAlexCitationWorkflow(
    steps=[
        SeedPaperStep(),
        CitationStep(citation_parameter="cited_by"),  # Papers citing seeds
        CitationStep(citation_parameter="cites", record_limit=5)  # Papers cited by seeds
    ]
)

# Configure coordinator
session_manager = CachedSessionManager(
    backend="redis",
    user_agent="Research/1.0 (mailto:user@institution.edu)"
)

coordinator = SearchCoordinator(
    query="machine learning",
    provider_name="openalex",
    workflow=citation_workflow,
    session=session_manager(),
    records_per_page=10
)

# Execute workflow
result = coordinator.search(page=1)

if result and result.data:
    print(f"Built citation network with {len(result.data)} unique papers")
    print(f"Metadata: {result.metadata}")

    # Analyze citation network
    import pandas as pd
    df = pd.DataFrame(result.data)
    print(f"\nPapers by type: {df.get('type', pd.Series()).value_counts()}")
else:
    print(f"Error: {result.error}")

Note

Advanced Pattern: This workflow overrides _create_workflow_result() to merge results from all three steps into a single unified response. This is an advanced customization technique for workflows that aggregate data from multiple steps.

Workflow Error Handling

How Errors Propagate

When a workflow step fails, ScholarFlux returns an error response immediately:

from scholar_flux import SearchCoordinator
from scholar_flux.api import ErrorResponse, NonResponse

coordinator = SearchCoordinator(
    query="test",
    provider_name="pubmed"
)

result = coordinator.search(page=1)

# Check if workflow succeeded
if result:
    print(f"✅ Workflow succeeded: {len(result.data)} records")
elif isinstance(result, ErrorResponse):
    print(f"❌ API error: {result.message}")
    print(f"Status code: {result.status_code}")
elif isinstance(result, NonResponse):
    print(f"❌ Network/config error: {result.message}")

What happens on failure:

  • If Step 1 fails: Returns ErrorResponse immediately, no subsequent steps run

  • If Step 2 fails: Returns ErrorResponse with details about Step 2’s failure

  • Partial results are cached: Step 1’s successful result remains in cache

Accessing Workflow History

For debugging, you can inspect what steps succeeded before failure:

coordinator = SearchCoordinator(
    query="test",
    provider_name="pubmed"
)

result = coordinator.search(page=1)

# Check workflow history
if coordinator.workflow and coordinator.workflow._history:
    print(f"Executed {len(coordinator.workflow._history)} steps:")

    for step_ctx in coordinator.workflow._history:
        step_name = step_ctx.step.__class__.__name__
        success = bool(step_ctx.result)
        record_count = len(step_ctx.result.data or []) if step_ctx.result else 0

        print(f"  Step {step_ctx.step_number} ({step_name}):")
        print(f"    Success: {success}")
        print(f"    Records: {record_count}")

        if isinstance(step_ctx.result, ErrorResponse):
            print(f"    Error: {step_ctx.result.message}")

Example output:

Executed 2 steps:
  Step 0 (PubMedSearchStep):
    Success: True
    Records: 0  # eSearch returns IDs in metadata, not data
  Step 1 (PubMedFetchStep):
    Success: False
    Records: 0
    Error: HTTP 500: Internal server error

Tip

The _history attribute is useful for debugging workflow failures, especially in complex multi-step workflows where you need to see exactly where the process failed.

Stop on Error Configuration

Control whether workflows halt on first error:

from scholar_flux.api.workflows import SearchWorkflow

# Stop on first error (default)
workflow = SearchWorkflow(
    steps=[Step1(), Step2(), Step3()],
    stop_on_error=True
)

# Continue through all steps even if some fail
workflow = SearchWorkflow(
    steps=[Step1(), Step2(), Step3()],
    stop_on_error=False  # All steps execute, history shows failures
)

Warning

With stop_on_error=False, later steps may fail due to missing data from earlier steps. Use this carefully and check _history to see which steps succeeded.

Creating ErrorResponse for Testing

When testing workflows, you may need to create ErrorResponse objects. ScholarFlux requires a response object:

from scholar_flux.api.models import ErrorResponse, ReconstructedResponse

# Create a mock response for testing
test_response = ReconstructedResponse.build(
    url="https://api.example.com/test",
    status_code=500,
    json={"status": "failure", "message": "Test error"}
)

# Create ErrorResponse with the mock response
error = ErrorResponse(
    response=test_response,
    error="TestError",
    message="Intentional failure for testing"
)

This pattern is useful when creating test workflows with simulated failures.

Best Practices

Workflow Design

DO:

  • Keep steps focused on single responsibilities (one step = one API call or transformation)

  • Use pre_transform() to pass data between steps cleanly

  • Handle errors gracefully in _run() and provide informative error messages

  • Validate context in pre_transform() with self._verify_context(ctx)

  • Use descriptive step class names that explain what the step does

DON’T:

  • Make steps depend on external state outside the workflow

  • Skip error handling (always check if previous results exist)

  • Modify coordinator state directly in steps (use with_context() if needed)

  • Create workflows for simple single-step retrieval (just use SearchCoordinator directly)

Type Safety

Use type annotations for better code quality and IDE support:

from typing import Optional
from scholar_flux.api.workflows import WorkflowStep, StepContext
from scholar_flux.api.models import ProcessedResponse

class TypedStep(WorkflowStep):
    provider_name: Optional[str] = "plos"

    def pre_transform(
        self,
        ctx: Optional[StepContext] = None,
        *args,
        **kwargs
    ) -> "TypedStep":
        """Validate context with type checking"""
        self._verify_context(ctx)

        # Type-safe context checking
        if not isinstance(ctx, StepContext):
            raise RuntimeError(f"Expected StepContext, got {type(ctx)}")

        # Type-safe data checking
        if not (ctx.result and ctx.result.data):
            raise RuntimeError("No data from previous step")

        return self

    def _run(
        self,
        step_number: int,
        search_coordinator,
        ctx: Optional[StepContext] = None,
        *args,
        **kwargs
    ) -> StepContext:
        """Execute with explicit return type"""

        # Use type annotations for collections
        records: list[dict] = []

        # ... step logic

        response = ProcessedResponse(
            response=None,
            processed_records=records,
            metadata={},
            created_at=generate_iso_timestamp()
        )

        return StepContext(
            step_number=step_number,
            step=self.model_copy(),
            result=response
        )

Benefits of type annotations:

  • Catch errors early with mypy type checking

  • Better IDE autocomplete and navigation

  • Self-documenting code

  • Easier refactoring

Parameter Types Reference

search_parameters - Control ScholarFlux behavior:

search_parameters = {
    'page': 1,                     # Which page to retrieve
    'normalize_records': True,     # Apply field normalization?
    'from_request_cache': True,    # Use HTTP cache?
    'from_process_cache': True     # Use processing cache?
}

config_parameters - Override SearchAPIConfig:

config_parameters = {
    'request_delay': 0.1,          # Rate limiting
    'records_per_page': 50,        # Results per page
    'id': '12345,67890',           # API-specific params
    'endpoint': 'works/',          # API endpoint path
    'filter': 'type:journal'       # API filters
}

Caching Strategy

Enable caching for expensive workflows to avoid re-executing all steps:

from scholar_flux import CachedSessionManager, DataCacheManager

# HTTP caching (request-level)
session = CachedSessionManager(backend='redis')

# Result caching (processing-level)
cache = DataCacheManager.with_storage('redis', namespace='my_workflow')

coordinator = SearchCoordinator(
    query="test",
    workflow=my_workflow,
    session=session(),       # Each step's HTTP request cached
    cache_manager=cache      # Each step's processed result cached
)

# First run: executes all workflow steps
result = coordinator.search(page=1)

# Second run: all steps retrieved from cache (instant)
result = coordinator.search(page=1)

How workflow caching works:

  1. Each step calls coordinator._search() internally

  2. Each _search() call checks DataCacheManager for cached results

  3. If cached, the step returns immediately without making an API request

  4. If not cached, the step executes and caches its result

Tip

Best practice: Use Redis or MongoDB caching in production for persistent workflow results across sessions.

Testing Workflows

Test each step independently before testing the full workflow:

from scholar_flux.api.workflows import StepContext
from scholar_flux.api import SearchCoordinator, ProcessedResponse

# Test Step 1 independently
step1 = MySearchStep()
mock_coordinator = SearchCoordinator(
    provider_name='my_provider',
    query='test'
)

# Execute Step 1
result = step1._run(
    step_number=1,
    search_coordinator=mock_coordinator,
    ctx=None
)

# Verify Step 1 output
assert isinstance(result, StepContext)
assert result.result
assert len(result.result.data or []) > 0

# Test Step 2 with Step 1's output
step2 = MyFetchStep()
step2 = step2.pre_transform(ctx=result)

# Use with_context for scoped execution (if step implements it)
with step2.with_context(mock_coordinator):
    step2_result = step2._run(
        step_number=2,
        search_coordinator=mock_coordinator,
        ctx=result
    )

# Verify Step 2 output
assert isinstance(step2_result, StepContext)
assert step2_result.result
assert len(step2_result.result.data or []) > 0

Tip

Test steps individually first, then test the complete workflow. This makes debugging much easier.

Advanced Customization

The with_context() Pattern

Use with_context() to temporarily modify coordinator settings for a step’s scope:

from contextlib import contextmanager
from typing import Generator
from scholar_flux.api.workflows import WorkflowStep

class CustomStep(WorkflowStep):
    """Step with temporary configuration changes"""

    provider_name: str = "plos"
    temp_delay: float = 0.1

    @contextmanager
    def with_context(
        self,
        search_coordinator,
        *args,
        **kwargs
    ) -> Generator["CustomStep", None, None]:
        """Temporarily reduce rate limiting for this step"""

        # Save original delay
        original_delay = search_coordinator.api.config.request_delay

        try:
            # Apply temporary delay
            search_coordinator.api.config.request_delay = self.temp_delay
            yield self
        finally:
            # Restore original delay
            search_coordinator.api.config.request_delay = original_delay

When to use with_context():

  • Temporarily modify rate limiting for specific steps

  • Apply step-specific API configurations

  • Scope expensive operations (logging, metrics)

  • Test workflows with mock configurations

Note

with_context() is an advanced pattern. Most workflows don’t need it—use pre_transform() and config_parameters instead for simpler customization.

Overriding _create_workflow_result()

Override this method to customize how final results are assembled:

from scholar_flux.api.workflows import SearchWorkflow, WorkflowResult
from scholar_flux.api.models import ProcessedResponse

class MergedWorkflow(SearchWorkflow):
    """Workflow that merges results from all steps"""

    def _create_workflow_result(
        self,
        result: Optional[ProcessedResponse] = None
    ) -> WorkflowResult:
        """Merge data from all steps into final result"""

        # Collect all records from all steps
        all_records: list[dict] = []
        for step_ctx in self._history:
            if step_ctx.result and step_ctx.result.data:
                all_records.extend(step_ctx.result.data)

        # Deduplicate
        seen_ids = set()
        unique_records = []
        for record in all_records:
            record_id = record.get('id')
            if record_id and record_id not in seen_ids:
                seen_ids.add(record_id)
                unique_records.append(record)

        # Create merged response
        merged_response = ProcessedResponse(
            response=None,
            processed_records=unique_records,
            metadata={
                'total_steps': len(self._history),
                'unique_records': len(unique_records)
            },
            created_at=generate_iso_timestamp()
        )

        return WorkflowResult(
            history=self._history,
            result=merged_response
        )

class Step1(WorkflowStep):
    provider_name: str = "plos"

    def _run(self, step_number, search_coordinator, ctx=None, *args, **kwargs):
        response = ProcessedResponse(
            response=None,
            processed_records=[{'id': '1', 'title': 'Paper A'}],
            metadata={},
            created_at=generate_iso_timestamp()
        )
        return StepContext(step_number=step_number, step=self.model_copy(), result=response)

class Step2(WorkflowStep):
    provider_name: str = "plos"

    def _run(self, step_number, search_coordinator, ctx=None, *args, **kwargs):
        response = ProcessedResponse(
            response=None,
            processed_records=[{'id': '2', 'title': 'Paper B'}],
            metadata={},
            created_at=generate_iso_timestamp()
        )
        return StepContext(step_number=step_number, step=self.model_copy(), result=response)

Common use cases:

  • Merging results from multiple steps (like OpenAlex citation network)

  • Copying metadata from early steps to final result (like PubMed)

  • Aggregating statistics across all steps

  • Deduplicating records from multiple sources

Troubleshooting

Common Issues

Issue: “Step X produced no records. Cannot continue.”

Cause: A step returned empty results, and the next step expected data.

Solution:

def pre_transform(self, ctx: StepContext) -> "MyStep":
    self._verify_context(ctx)

    if not isinstance(ctx, StepContext):
        raise RuntimeError(f"Expected `ctx` to be a StepContext, received {type(ctx)}")

    # Check if previous step has data
    if not (ctx.result and ctx.result.data):
        raise RuntimeError(
            f"Step {ctx.step_number} returned no results. "
            f"Check query parameters or API availability."
        )

    # ... rest of pre_transform

Issue: Workflow runs but returns wrong data

Cause: Parameter passing between steps is incorrect.

Solution: Debug with _history:

result = coordinator.search(page=1)

# Inspect what each step received and returned
for i, step_ctx in enumerate(coordinator.workflow._history):
    print(f"\n=== Step {i} ===")
    print(f"Step type: {step_ctx.step.__class__.__name__}")
    print(f"Config: {step_ctx.step.config_parameters}")
    print(f"Search params: {step_ctx.step.search_parameters}")
    if step_ctx.result:
        print(f"Records returned: {len(step_ctx.result.data or [])}")
        print(f"Metadata: {step_ctx.result.metadata}")

Issue: Workflow caching not working

Cause: Cache keys include all parameters, so slight differences prevent cache hits.

Solution: Use consistent parameters and namespaces:

# Use namespace to isolate workflow cache
cache = DataCacheManager.with_storage(
    'redis',
    namespace='my_workflow_v1'  # Version namespace for cache isolation
)

Next Steps

Related Guides:

Advanced Topics:

API Reference