Workflows
This tutorial explains how ScholarFlux workflows enable multi-step data retrieval from academic APIs, with real examples using PubMed, Crossref, and OpenAlex.
Reading Guide
New to ScholarFlux? Complete Getting Started first, then return here.
Quick start (5 min): Read “Overview” and “Built-in Workflows” to understand PubMed’s automatic workflow, then start querying.
Building custom workflows (20 min): Read through “Creating Custom Workflows” and one real-world example to learn the workflow pattern.
Advanced customization (15 min): Jump to “Best Practices” or “Advanced Customization” for extension points and production patterns.
Debugging workflow issues: See “Workflow Error Handling” and “Troubleshooting” sections.
Production deployment: After reading, see Caching Strategies for workflow caching patterns and Production Deployment for deployment configuration.
Prerequisites
Complete Getting Started for basic search patterns
Understand Response Handling and Error Patterns for workflow error handling
(Optional) Familiarity with Multi-Provider Search for concurrent workflows
Overview
What are Workflows?
Some academic APIs require multiple requests to retrieve complete article data. For example:
PubMed: Search for IDs → Fetch full records using those IDs
Crossref: Search for articles → Fetch detailed metadata using DOIs
OpenAlex: Search for seed papers → Fetch citation networks
ScholarFlux workflows automate these multi-step processes through the SearchWorkflow class.
Note
Most users never interact with workflows directly. PubMed’s workflow is configured automatically—just use SearchCoordinator(provider_name="pubmed") and everything works transparently. Custom workflows are only needed for specialized multi-step retrieval patterns.
Why Use Workflows?
Without workflows:
# Manual two-step process (tedious and error-prone)
# Step 1: Search PubMed for IDs
search_response = requests.get(
'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi',
params={'term': 'neuroscience', 'retmax': 20}
)
ids = parse_xml(search_response)['IdList']['Id']
# Step 2: Fetch records using IDs
fetch_response = requests.get(
'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi',
params={'id': ','.join(ids), 'retmode': 'xml'}
)
records = parse_xml(fetch_response)
With workflows:
# Automatic two-step process
coordinator = SearchCoordinator(
query="neuroscience",
provider_name="pubmed" # Workflow configured automatically
)
result = coordinator.search(page=1)
# Returns complete records with abstracts and metadata
Workflow Architecture
Understanding the Data Flow
ScholarFlux workflows coordinate multiple API calls with intelligent data passing between steps:
SearchCoordinator.search_page(page=1)
├─> Creates SearchResult container (holds query, provider, page)
│
└─> Calls SearchCoordinator.search()
├─> Workflow enabled? → Execute workflow
│ │
│ ├─> Step 1: WorkflowStep._run()
│ │ ├─> Calls coordinator._search() internally
│ │ ├─> Returns ProcessedResponse (or ErrorResponse)
│ │ ├─> Cached by DataCacheManager ✓
│ │ └─> Wrapped in StepContext
│ │
│ ├─> Step 2: WorkflowStep.pre_transform(ctx=Step1Context)
│ │ ├─> Extracts data from Step 1's result
│ │ ├─> Configures parameters for Step 2
│ │ └─> Returns modified WorkflowStep
│ │
│ ├─> Step 2: WorkflowStep._run()
│ │ ├─> Calls coordinator._search() with Step 1's data
│ │ ├─> Returns ProcessedResponse (or ErrorResponse)
│ │ ├─> Cached by DataCacheManager ✓
│ │ └─> Wrapped in StepContext
│ │
│ └─> Returns final ProcessedResponse
│
└─> No workflow? → Call coordinator._search() directly
Key Points:
Each step is cached independently by DataCacheManager
Errors at any step return ErrorResponse with details
StepContext passes results between steps without modifying coordinator
SearchResult (from search_page) adds query/provider/page context for user convenience
Components Explained
WorkflowStep: Defines behavior for a single step
class WorkflowStep(BaseModel):
provider_name: str = "my_provider"
search_parameters: dict = {} # kwargs for _search()
config_parameters: dict = {} # overrides for SearchAPIConfig
def pre_transform(self, ctx: StepContext) -> "WorkflowStep":
"""Modify step based on previous results (optional)"""
def _run(self, step_number: int, coordinator, ctx: StepContext) -> StepContext:
"""Execute the step (required)"""
def post_transform(self, ctx: StepContext) -> StepContext:
"""Modify results after execution (optional)"""
StepContext: Carries results between steps
@dataclass
class StepContext:
step_number: int # Which step this is (0, 1, 2...)
step: WorkflowStep # The WorkflowStep instance
result: ProcessedResponse # The API response (or ErrorResponse)
SearchWorkflow: Orchestrates all steps
class SearchWorkflow(BaseModel):
steps: List[WorkflowStep] # Steps to execute in order
stop_on_error: bool = True # Halt workflow on step failure?
def _run(self, coordinator) -> WorkflowResult:
"""Execute all steps sequentially"""
Parameter Types
When customizing workflow steps, two parameter types control behavior:
1. search_parameters - Arguments passed to coordinator._search()
# passed to SearchCoordinator._search() in the current workflow on execution
search_parameters = {
'page': 1, # Which page to retrieve
'normalize_records': True, # Apply field normalization?
'from_cache': True # Use cached results?
}
2. config_parameters - Overrides for SearchAPIConfig
# used to update the `SearchAPIConfig` temporarily when required
config_parameters = {
'request_delay': 0.1, # Rate limiting delay
'records_per_page': 50, # Results per page
'id': '12345,67890', # API-specific parameter (e.g., PubMed IDs)
'endpoint': 'works/doi' # API endpoint path
}
Tip
Use search_parameters to control ScholarFlux behavior. Use config_parameters to customize the API request itself.
Built-in Workflows
PubMed Search→Fetch (Automatic)
PubMed requires two API calls: eSearch (get IDs) → eFetch (get records).
from scholar_flux import SearchCoordinator
# PubMed workflow is automatically configured
coordinator = SearchCoordinator(
query="gene therapy",
provider_name="pubmed"
)
# Single call executes two-step workflow transparently
result = coordinator.search(page=1)
if result:
print(f"Retrieved {len(result.data)} records")
for record in result.data[:3]:
print(f"Title: {record.get('MedlineCitation.Article.ArticleTitle')}")
print(f"Abstract: {record.get('MedlineCitation.Article.Abstract.AbstractText')}")
print()
What happens behind the scenes:
Step 1 (eSearch): - Provider:
pubmed- Retrieves list of PubMed IDs matching query - Returns ProcessedResponse with IDs inmetadata['IdList']['Id']- Cached ✓Step 2 (eFetch): - Provider:
pubmedefetch(different endpoint!) - Extracts IDs from Step 1’s metadata - Fetches complete records for those IDs - Returns ProcessedResponse with full articles - Cached ✓Result: Full article metadata including abstracts, preserved search metadata
Note
Provider Switching: Workflows can use different providers for different steps. PubMed uses pubmed for eSearch and pubmedefetch for eFetch because they have different base URLs and configurations. Rate limiting is handled independently for each provider.
Tip
ScholarFlux preserves metadata from Step 1 (total results, query info) in the final response, so you get both complete article data and search context.
Custom Workflows
When to Create Custom Workflows
You need a custom workflow when:
✅ An API requires multiple steps to get complete data
✅ You need to filter/transform data between steps
✅ You want to aggregate results from multiple searches
✅ You need to use results from one search to guide the next
You DON’T need a custom workflow when:
❌ Simple single-step retrieval (use
SearchCoordinatordirectly)❌ Querying multiple providers (use
MultiSearchCoordinator)❌ Just caching results (use Caching Strategies)
Creating Custom Workflows
Build custom workflows by subclassing WorkflowStep:
from scholar_flux.api.workflows import WorkflowStep, SearchWorkflow, StepContext
from scholar_flux.api.models import ProcessedResponse
from scholar_flux.utils import generate_iso_timestamp
from typing import Optional
class MySearchStep(WorkflowStep):
"""Step 1: Search for records"""
provider_name: Optional[str] = "my_provider"
class MyFetchStep(WorkflowStep):
"""Step 2: Fetch detailed data"""
provider_name: Optional[str] = "my_provider"
def pre_transform(
self,
ctx: Optional[StepContext] = None,
*args,
**kwargs
) -> "MyFetchStep":
"""Extract data from previous step to configure this step"""
# Validate context type
self._verify_context(ctx)
# Extract data from Step 1
if ctx and ctx.result:
previous_results = ctx.result.data or []
# Extract IDs for Step 2
ids = [r['id'] for r in previous_results if r.get('id')]
# Configure this step to use those IDs
self.config_parameters = {'ids': ids}
return self
def _run(
self,
step_number: int,
search_coordinator,
ctx: Optional[StepContext] = None,
*args,
**kwargs
) -> StepContext:
"""Execute Step 2 using IDs from Step 1"""
# Get IDs configured in pre_transform
ids = self.config_parameters.get('ids', [])
# Fetch records using IDs
responses = [
search_coordinator.parameter_search(id=id_value)
for id_value in ids
]
# Combine results
combined_data = [
record
for response in responses if response
for record in (response.data or [])
]
# Create final response
final_response = ProcessedResponse(
response=responses[0].response if responses else None,
processed_records=combined_data,
metadata={'count': len(combined_data)},
created_at=generate_iso_timestamp()
)
# Return as StepContext
return StepContext(
step_number=step_number,
step=self.model_copy(),
result=final_response
)
# Create and use workflow
workflow = SearchWorkflow(steps=[MySearchStep(), MyFetchStep()])
coordinator = SearchCoordinator(
query="test",
provider_name="my_provider",
workflow=workflow
)
result = coordinator.search(page=1)
Key Customization Points:
pre_transform(): Modify step configuration based on previous results
_run(): Execute the step’s main logic
post_transform(): Transform results after execution (optional)
with_context(): Temporarily modify coordinator for step scope (advanced)
Real-World Example 1: Crossref DOI Enrichment
This workflow searches Crossref, then fetches detailed metadata for each DOI.
Complete Implementation
from scholar_flux.api.workflows import WorkflowStep, SearchWorkflow, StepContext
from scholar_flux.api import SearchCoordinator, ProcessedResponse
from scholar_flux.api.models import SearchAPIConfig
from scholar_flux.utils.helpers import generate_iso_timestamp
from typing import Optional
from pydantic import Field
class PreprocessStep(WorkflowStep):
"""Step 1: Search Crossref for articles"""
provider_name: Optional[str] = Field(
default="crossref",
description="Provider for initial search"
)
class EnrichmentStep(WorkflowStep):
"""Step 2: Fetch detailed metadata using DOIs from Step 1"""
provider_name: Optional[str] = Field(
default="crossref",
description="Provider for DOI-based retrieval"
)
def pre_transform(
self,
ctx: Optional[StepContext] = None,
provider_name: Optional[str] = None,
search_parameters: Optional[dict] = None,
config_parameters: Optional[dict] = None,
*args,
**kwargs
) -> "EnrichmentStep":
"""Extract DOIs from Step 1 results"""
# Validate we have context
self._verify_context(ctx)
# Get results from preprocessing step
preprocessed_result = ctx.result.data or [] if ctx and ctx.result else []
if not preprocessed_result:
raise TypeError(
f"Step 1 produced no records. Cannot continue."
)
# Extract DOIs
dois = [r['DOI'] for r in preprocessed_result if r.get('DOI')]
# Configure for DOI-based retrieval
# Note: Using config_parameters to override SearchAPIConfig
self.search_parameters = {"parameters": {"dois": dois}}
return self
def _run(
self,
step_number: int,
search_coordinator,
ctx: Optional[StepContext] = None,
verbose: Optional[bool] = True,
*args,
**kwargs
) -> StepContext:
"""Fetch detailed metadata for each DOI"""
# Get DOI list from pre_transform
doi_list = self.search_parameters["parameters"]["dois"]
# Fetch each DOI individually
# Note: parameter_search() allows custom endpoint paths
processed_responses = [
search_coordinator.parameter_search(
endpoint="works/" + doi,
**kwargs
)
for doi in doi_list if doi
]
# Combine all records
combined_records = [
record
for response in processed_responses
if response and response.data
for record in response.data
]
# Create final response
final_response = ProcessedResponse(
response=processed_responses[0].response if processed_responses else None,
processed_records=combined_records,
metadata={
'total_dois': len(doi_list),
'enriched_records': len(combined_records)
},
created_at=generate_iso_timestamp()
)
return StepContext(
step_number=step_number,
step=self.model_copy(),
result=final_response
)
Usage Example
from scholar_flux import SearchCoordinator
from scholar_flux.sessions import CachedSessionManager
# Create enrichment workflow
enrichment_workflow = SearchWorkflow(
steps=[PreprocessStep(), EnrichmentStep()]
)
# Configure coordinator
session_manager = CachedSessionManager(
backend="redis",
user_agent="Research/1.0 (mailto:user@institution.edu)"
)
coordinator = SearchCoordinator(
query="machine learning healthcare",
provider_name="crossref",
workflow=enrichment_workflow,
session=session_manager()
)
# Execute workflow
result = coordinator.search(page=1)
if result:
print(f"✅ Enriched {len(result.data)} records")
print(f"Metadata: {result.metadata}")
else:
print(f"❌ Error: {result.error} - {result.message}")
Note
Caching Behavior: Both Step 1 (search) and Step 2 (enrichment) results are cached independently. If you run the same query again, both steps will be retrieved from cache instantly.
Real-World Example 2: OpenAlex Citation Network
This workflow builds citation networks by: 1. Finding seed papers 2. Retrieving papers that cite them 3. Retrieving papers they cite
Complete Implementation
from scholar_flux.api.workflows import WorkflowStep, SearchWorkflow, StepContext
from scholar_flux.api import SearchCoordinator, ProcessedResponse
from scholar_flux.utils.helpers import generate_iso_timestamp
from typing import Optional
from pydantic import Field
class SeedPaperStep(WorkflowStep):
"""Step 1: Find seed papers matching the query"""
provider_name: Optional[str] = "openalex"
class CitationStep(WorkflowStep):
"""Step 2 (and 3): Fetch citation network data"""
provider_name: Optional[str] = "openalex"
citation_parameter: str = Field(
default="cited_by",
description="'cited_by' or 'cites' for citation direction"
)
record_limit: Optional[int] = Field(
default=None,
description="Max records to fetch per seed paper"
)
def pre_transform(
self,
ctx: Optional[StepContext] = None,
*args,
**kwargs
) -> "CitationStep":
"""Extract OpenAlex IDs from previous results"""
self._verify_context(ctx)
# Get seed papers from previous step
if ctx and ctx.result:
seed_papers = ctx.result.data or []
# Extract OpenAlex IDs
openalex_ids = [
p.get('id') for p in seed_papers
if p.get('id')
]
# Store for _run()
self.search_parameters = {'openalex_ids': openalex_ids}
return self
def _run(
self,
step_number: int,
search_coordinator,
ctx: Optional[StepContext] = None,
*args,
**kwargs
) -> StepContext:
"""Fetch citations for each seed paper"""
openalex_ids = self.search_parameters.get('openalex_ids', [])
all_citations = []
for oa_id in openalex_ids:
# Build filter for this paper's citations
filter_param = f"{self.citation_parameter}:{oa_id}"
# Fetch citations
response = search_coordinator.parameter_search(
filter=filter_param,
per_page=self.record_limit or 25,
**kwargs
)
if response and response.data:
all_citations.extend(response.data[:self.record_limit] if self.record_limit else response.data)
# Deduplicate by OpenAlex ID
seen_ids = set()
unique_citations = []
for citation in all_citations:
citation_id = citation.get('id')
if citation_id and citation_id not in seen_ids:
seen_ids.add(citation_id)
unique_citations.append(citation)
# Create final response
final_response = ProcessedResponse(
response=None, # Multiple API calls, no single response
processed_records=unique_citations,
metadata={
'seed_papers': len(openalex_ids),
'total_citations': len(all_citations),
'unique_citations': len(unique_citations),
'citation_direction': self.citation_parameter
},
created_at=generate_iso_timestamp()
)
return StepContext(
step_number=step_number,
step=self.model_copy(),
result=final_response
)
class OpenAlexCitationWorkflow(SearchWorkflow):
"""Three-step workflow for building citation networks"""
def _create_workflow_result(
self,
result: Optional[ProcessedResponse] = None
) -> WorkflowResult:
"""Merge results from all citation steps"""
# Use base implementation
workflow_result = super()._create_workflow_result(result)
# Optionally merge data from all steps
if len(self._history) >= 3:
seed_step = self._history[0]
cited_by_step = self._history[1]
cites_step = self._history[2]
# Combine all unique papers
all_papers = []
seen_ids = set()
for step_ctx in [seed_step, cited_by_step, cites_step]:
if step_ctx.result and step_ctx.result.data:
for paper in step_ctx.result.data:
paper_id = paper.get('id')
if paper_id and paper_id not in seen_ids:
seen_ids.add(paper_id)
all_papers.append(paper)
# Create merged response
merged_response = ProcessedResponse(
response=None,
processed_records=all_papers,
metadata={
'total_papers': len(all_papers),
'seed_papers': len(seed_step.result.data) if seed_step.result else 0,
'citing_papers': len(cited_by_step.result.data) if cited_by_step.result else 0,
'cited_papers': len(cites_step.result.data) if cites_step.result else 0
},
created_at=generate_iso_timestamp()
)
workflow_result.result = merged_response
return workflow_result
Usage Example
from scholar_flux import CachedSessionManager
# Create citation network workflow
citation_workflow = OpenAlexCitationWorkflow(
steps=[
SeedPaperStep(),
CitationStep(citation_parameter="cited_by"), # Papers citing seeds
CitationStep(citation_parameter="cites", record_limit=5) # Papers cited by seeds
]
)
# Configure coordinator
session_manager = CachedSessionManager(
backend="redis",
user_agent="Research/1.0 (mailto:user@institution.edu)"
)
coordinator = SearchCoordinator(
query="machine learning",
provider_name="openalex",
workflow=citation_workflow,
session=session_manager(),
records_per_page=10
)
# Execute workflow
result = coordinator.search(page=1)
if result and result.data:
print(f"Built citation network with {len(result.data)} unique papers")
print(f"Metadata: {result.metadata}")
# Analyze citation network
import pandas as pd
df = pd.DataFrame(result.data)
print(f"\nPapers by type: {df.get('type', pd.Series()).value_counts()}")
else:
print(f"Error: {result.error}")
Note
Advanced Pattern: This workflow overrides _create_workflow_result() to merge results from all three steps into a single unified response. This is an advanced customization technique for workflows that aggregate data from multiple steps.
Workflow Error Handling
How Errors Propagate
When a workflow step fails, ScholarFlux returns an error response immediately:
from scholar_flux import SearchCoordinator
from scholar_flux.api import ErrorResponse, NonResponse
coordinator = SearchCoordinator(
query="test",
provider_name="pubmed"
)
result = coordinator.search(page=1)
# Check if workflow succeeded
if result:
print(f"✅ Workflow succeeded: {len(result.data)} records")
elif isinstance(result, ErrorResponse):
print(f"❌ API error: {result.message}")
print(f"Status code: {result.status_code}")
elif isinstance(result, NonResponse):
print(f"❌ Network/config error: {result.message}")
What happens on failure:
If Step 1 fails: Returns ErrorResponse immediately, no subsequent steps run
If Step 2 fails: Returns ErrorResponse with details about Step 2’s failure
Partial results are cached: Step 1’s successful result remains in cache
Accessing Workflow History
For debugging, you can inspect what steps succeeded before failure:
coordinator = SearchCoordinator(
query="test",
provider_name="pubmed"
)
result = coordinator.search(page=1)
# Check workflow history
if coordinator.workflow and coordinator.workflow._history:
print(f"Executed {len(coordinator.workflow._history)} steps:")
for step_ctx in coordinator.workflow._history:
step_name = step_ctx.step.__class__.__name__
success = bool(step_ctx.result)
record_count = len(step_ctx.result.data or []) if step_ctx.result else 0
print(f" Step {step_ctx.step_number} ({step_name}):")
print(f" Success: {success}")
print(f" Records: {record_count}")
if isinstance(step_ctx.result, ErrorResponse):
print(f" Error: {step_ctx.result.message}")
Example output:
Executed 2 steps:
Step 0 (PubMedSearchStep):
Success: True
Records: 0 # eSearch returns IDs in metadata, not data
Step 1 (PubMedFetchStep):
Success: False
Records: 0
Error: HTTP 500: Internal server error
Tip
The _history attribute is useful for debugging workflow failures, especially in complex multi-step workflows where you need to see exactly where the process failed.
Stop on Error Configuration
Control whether workflows halt on first error:
from scholar_flux.api.workflows import SearchWorkflow
# Stop on first error (default)
workflow = SearchWorkflow(
steps=[Step1(), Step2(), Step3()],
stop_on_error=True
)
# Continue through all steps even if some fail
workflow = SearchWorkflow(
steps=[Step1(), Step2(), Step3()],
stop_on_error=False # All steps execute, history shows failures
)
Warning
With stop_on_error=False, later steps may fail due to missing data from earlier steps. Use this carefully and check _history to see which steps succeeded.
Creating ErrorResponse for Testing
When testing workflows, you may need to create ErrorResponse objects. ScholarFlux requires a response object:
from scholar_flux.api.models import ErrorResponse, ReconstructedResponse
# Create a mock response for testing
test_response = ReconstructedResponse.build(
url="https://api.example.com/test",
status_code=500,
json={"status": "failure", "message": "Test error"}
)
# Create ErrorResponse with the mock response
error = ErrorResponse(
response=test_response,
error="TestError",
message="Intentional failure for testing"
)
This pattern is useful when creating test workflows with simulated failures.
Best Practices
Workflow Design
DO:
Keep steps focused on single responsibilities (one step = one API call or transformation)
Use
pre_transform()to pass data between steps cleanlyHandle errors gracefully in
_run()and provide informative error messagesValidate context in
pre_transform()withself._verify_context(ctx)Use descriptive step class names that explain what the step does
DON’T:
Make steps depend on external state outside the workflow
Skip error handling (always check if previous results exist)
Modify coordinator state directly in steps (use
with_context()if needed)Create workflows for simple single-step retrieval (just use SearchCoordinator directly)
Type Safety
Use type annotations for better code quality and IDE support:
from typing import Optional
from scholar_flux.api.workflows import WorkflowStep, StepContext
from scholar_flux.api.models import ProcessedResponse
class TypedStep(WorkflowStep):
provider_name: Optional[str] = "plos"
def pre_transform(
self,
ctx: Optional[StepContext] = None,
*args,
**kwargs
) -> "TypedStep":
"""Validate context with type checking"""
self._verify_context(ctx)
# Type-safe context checking
if not isinstance(ctx, StepContext):
raise RuntimeError(f"Expected StepContext, got {type(ctx)}")
# Type-safe data checking
if not (ctx.result and ctx.result.data):
raise RuntimeError("No data from previous step")
return self
def _run(
self,
step_number: int,
search_coordinator,
ctx: Optional[StepContext] = None,
*args,
**kwargs
) -> StepContext:
"""Execute with explicit return type"""
# Use type annotations for collections
records: list[dict] = []
# ... step logic
response = ProcessedResponse(
response=None,
processed_records=records,
metadata={},
created_at=generate_iso_timestamp()
)
return StepContext(
step_number=step_number,
step=self.model_copy(),
result=response
)
Benefits of type annotations:
Catch errors early with
mypytype checkingBetter IDE autocomplete and navigation
Self-documenting code
Easier refactoring
Parameter Types Reference
search_parameters - Control ScholarFlux behavior:
search_parameters = {
'page': 1, # Which page to retrieve
'normalize_records': True, # Apply field normalization?
'from_request_cache': True, # Use HTTP cache?
'from_process_cache': True # Use processing cache?
}
config_parameters - Override SearchAPIConfig:
config_parameters = {
'request_delay': 0.1, # Rate limiting
'records_per_page': 50, # Results per page
'id': '12345,67890', # API-specific params
'endpoint': 'works/', # API endpoint path
'filter': 'type:journal' # API filters
}
Caching Strategy
Enable caching for expensive workflows to avoid re-executing all steps:
from scholar_flux import CachedSessionManager, DataCacheManager
# HTTP caching (request-level)
session = CachedSessionManager(backend='redis')
# Result caching (processing-level)
cache = DataCacheManager.with_storage('redis', namespace='my_workflow')
coordinator = SearchCoordinator(
query="test",
workflow=my_workflow,
session=session(), # Each step's HTTP request cached
cache_manager=cache # Each step's processed result cached
)
# First run: executes all workflow steps
result = coordinator.search(page=1)
# Second run: all steps retrieved from cache (instant)
result = coordinator.search(page=1)
How workflow caching works:
Each step calls
coordinator._search()internallyEach
_search()call checks DataCacheManager for cached resultsIf cached, the step returns immediately without making an API request
If not cached, the step executes and caches its result
Tip
Best practice: Use Redis or MongoDB caching in production for persistent workflow results across sessions.
Testing Workflows
Test each step independently before testing the full workflow:
from scholar_flux.api.workflows import StepContext
from scholar_flux.api import SearchCoordinator, ProcessedResponse
# Test Step 1 independently
step1 = MySearchStep()
mock_coordinator = SearchCoordinator(
provider_name='my_provider',
query='test'
)
# Execute Step 1
result = step1._run(
step_number=1,
search_coordinator=mock_coordinator,
ctx=None
)
# Verify Step 1 output
assert isinstance(result, StepContext)
assert result.result
assert len(result.result.data or []) > 0
# Test Step 2 with Step 1's output
step2 = MyFetchStep()
step2 = step2.pre_transform(ctx=result)
# Use with_context for scoped execution (if step implements it)
with step2.with_context(mock_coordinator):
step2_result = step2._run(
step_number=2,
search_coordinator=mock_coordinator,
ctx=result
)
# Verify Step 2 output
assert isinstance(step2_result, StepContext)
assert step2_result.result
assert len(step2_result.result.data or []) > 0
Tip
Test steps individually first, then test the complete workflow. This makes debugging much easier.
Advanced Customization
The with_context() Pattern
Use with_context() to temporarily modify coordinator settings for a step’s scope:
from contextlib import contextmanager
from typing import Generator
from scholar_flux.api.workflows import WorkflowStep
class CustomStep(WorkflowStep):
"""Step with temporary configuration changes"""
provider_name: str = "plos"
temp_delay: float = 0.1
@contextmanager
def with_context(
self,
search_coordinator,
*args,
**kwargs
) -> Generator["CustomStep", None, None]:
"""Temporarily reduce rate limiting for this step"""
# Save original delay
original_delay = search_coordinator.api.config.request_delay
try:
# Apply temporary delay
search_coordinator.api.config.request_delay = self.temp_delay
yield self
finally:
# Restore original delay
search_coordinator.api.config.request_delay = original_delay
When to use with_context():
Temporarily modify rate limiting for specific steps
Apply step-specific API configurations
Scope expensive operations (logging, metrics)
Test workflows with mock configurations
Note
with_context() is an advanced pattern. Most workflows don’t need it—use pre_transform() and config_parameters instead for simpler customization.
Overriding _create_workflow_result()
Override this method to customize how final results are assembled:
from scholar_flux.api.workflows import SearchWorkflow, WorkflowResult
from scholar_flux.api.models import ProcessedResponse
class MergedWorkflow(SearchWorkflow):
"""Workflow that merges results from all steps"""
def _create_workflow_result(
self,
result: Optional[ProcessedResponse] = None
) -> WorkflowResult:
"""Merge data from all steps into final result"""
# Collect all records from all steps
all_records: list[dict] = []
for step_ctx in self._history:
if step_ctx.result and step_ctx.result.data:
all_records.extend(step_ctx.result.data)
# Deduplicate
seen_ids = set()
unique_records = []
for record in all_records:
record_id = record.get('id')
if record_id and record_id not in seen_ids:
seen_ids.add(record_id)
unique_records.append(record)
# Create merged response
merged_response = ProcessedResponse(
response=None,
processed_records=unique_records,
metadata={
'total_steps': len(self._history),
'unique_records': len(unique_records)
},
created_at=generate_iso_timestamp()
)
return WorkflowResult(
history=self._history,
result=merged_response
)
class Step1(WorkflowStep):
provider_name: str = "plos"
def _run(self, step_number, search_coordinator, ctx=None, *args, **kwargs):
response = ProcessedResponse(
response=None,
processed_records=[{'id': '1', 'title': 'Paper A'}],
metadata={},
created_at=generate_iso_timestamp()
)
return StepContext(step_number=step_number, step=self.model_copy(), result=response)
class Step2(WorkflowStep):
provider_name: str = "plos"
def _run(self, step_number, search_coordinator, ctx=None, *args, **kwargs):
response = ProcessedResponse(
response=None,
processed_records=[{'id': '2', 'title': 'Paper B'}],
metadata={},
created_at=generate_iso_timestamp()
)
return StepContext(step_number=step_number, step=self.model_copy(), result=response)
Common use cases:
Merging results from multiple steps (like OpenAlex citation network)
Copying metadata from early steps to final result (like PubMed)
Aggregating statistics across all steps
Deduplicating records from multiple sources
Troubleshooting
Common Issues
Issue: “Step X produced no records. Cannot continue.”
Cause: A step returned empty results, and the next step expected data.
Solution:
def pre_transform(self, ctx: StepContext) -> "MyStep":
self._verify_context(ctx)
if not isinstance(ctx, StepContext):
raise RuntimeError(f"Expected `ctx` to be a StepContext, received {type(ctx)}")
# Check if previous step has data
if not (ctx.result and ctx.result.data):
raise RuntimeError(
f"Step {ctx.step_number} returned no results. "
f"Check query parameters or API availability."
)
# ... rest of pre_transform
Issue: Workflow runs but returns wrong data
Cause: Parameter passing between steps is incorrect.
Solution: Debug with _history:
result = coordinator.search(page=1)
# Inspect what each step received and returned
for i, step_ctx in enumerate(coordinator.workflow._history):
print(f"\n=== Step {i} ===")
print(f"Step type: {step_ctx.step.__class__.__name__}")
print(f"Config: {step_ctx.step.config_parameters}")
print(f"Search params: {step_ctx.step.search_parameters}")
if step_ctx.result:
print(f"Records returned: {len(step_ctx.result.data or [])}")
print(f"Metadata: {step_ctx.result.metadata}")
Issue: Workflow caching not working
Cause: Cache keys include all parameters, so slight differences prevent cache hits.
Solution: Use consistent parameters and namespaces:
# Use namespace to isolate workflow cache
cache = DataCacheManager.with_storage(
'redis',
namespace='my_workflow_v1' # Version namespace for cache isolation
)
Next Steps
Related Guides:
Getting Started - Foundation for workflow usage
Response Handling and Error Patterns - Error handling in workflows
Multi-Provider Search - Use workflows with multiple providers
Schema Normalization - Normalize workflow results
Custom Providers - Create workflows for new providers
Advanced Topics:
Caching Strategies - Cache expensive workflow results
Production Deployment - Deploy workflows in production