# /api/workflows/pubmed_workflow.py
"""The scholar_flux.api.workflows.pubmed_workflow module defines the core steps for retrieving records from PubMed API.
These two steps integrate into a single workflow to consolidate the two-step article/abstract retrieval process into a
single step that involves the automatic execution of a workflow.
Classes:
PubMedSearchStep:
The first of two steps in the article/metadata response retrieval process involving ID retrieval
PubMedFetchStep:
The second of two steps in the article/metadata response retrieval process that resolves IDs into
their corresponding article data and metadata.
Note that this workflow is further defined in the workflow_defaults.py module and is automatically retrieved
when creating a new SearchCoordinator when `provider_name=pubmed`. The `SearchCoordinator.search()` method
will then automatically retrieve records and metadata without the need to directly execute either step if
workflows are enabled in the SearchCoordinator.
"""
from __future__ import annotations
from pydantic import Field
from typing import Any, Optional
from scholar_flux.api.models import ProcessedResponse, ErrorResponse, SearchAPIConfig
from scholar_flux.api.workflows.search_workflow import StepContext, WorkflowStep, SearchWorkflow, WorkflowResult
from scholar_flux.exceptions import NoRecordsAvailableException
from scholar_flux.api.base_coordinator import BaseCoordinator
import logging
logger = logging.getLogger(__name__)
[docs]
class PubMedSearchStep(WorkflowStep):
"""Initial step of the PubMed workflow that retrieves the IDs of articles/abstracts matching the query.
The equivalent of this step is the retrieval of a single page from the PubMed API without the use of a workflow.
The default search/config parameter settings can be overridden to customize how the workflow step is executed.
After retrieving the IDs of records that match the current query and page, the workflow will pass these IDs
as context to the following `PubMedFetchStep` which will then resolve each ID into its associated actual article
and/or abstract.
Attributes:
provider_name (Optional[str]):
Defines the `pubmed` eSearch API as the location where the initial request will be sent.
step_number:
Metadata indicating the intended position in the workflow sequence. This is for documentation purposes
only; the actual execution order is determined by the step's position in the workflow's `steps` list.
description:
Metadata indicating the purpose of the current workflow step. This is for documentation purposes only.
"""
provider_name: Optional[str] = "pubmed"
step_number: Optional[int] = 0
description: Optional[str] = "Retrieves IDs of records matching a particular query from the PubMed database."
[docs]
class PubMedFetchStep(WorkflowStep):
"""Next and final step of the PubMed workflow that uses the eFetch API to resolve article/abstract Ids.
These ids are retrieved from the metadata of the previous step and are used as input to eFetch to retrieve their
associated articles and/or abstracts.
Args:
provider_name (Optional[str]):
Defines the `pubmed` eFetch API as the location where the next/final request will be sent.
step_number:
Metadata indicating the intended position in the workflow sequence. This is for documentation purposes
only; the actual execution order is determined by the step's position in the workflow's `steps` list.
description:
Metadata indicating the purpose of the current workflow step. This is for documentation purposes only.
"""
provider_name: Optional[str] = "pubmedefetch"
step_number: Optional[int] = 1
description: Optional[str] = "Fetches each record/article corresponding to a PubMed ID from the PubMedSearchStep."
[docs]
class PubMedSearchWorkflow(SearchWorkflow):
"""SearchWorkflow implementation for PubMed's two-step article retrieval process.
PubMed's API requires a two-step retrieval process:
1. **eSearch (PubMedSearchStep)**: Searches for articles matching the query and returns a list of article IDs
along with metadata about the search (query info, pagination, result counts, etc.)
2. **eFetch (PubMedFetchStep)**: Takes the article IDs from step 1 and retrieves the full article data
including abstracts, authors, and other detailed information.
This workflow coordinates both steps automatically and ensures that metadata from the initial eSearch
is preserved in the final result, providing consumers with both the full article data and the search context.
"""
steps: list[WorkflowStep] = Field(default_factory=lambda: [PubMedSearchStep(), PubMedFetchStep()])
def _run(
self,
search_coordinator: BaseCoordinator,
verbose: bool = True,
**keyword_parameters: Any,
) -> WorkflowResult:
"""Executes the PubMed workflow and catches edge-cases where successful eSearches return no records for a
query."""
try:
return super()._run(search_coordinator, verbose, **keyword_parameters)
except NoRecordsAvailableException as e:
if not (self._history and self._history[0] and self._history[0].result):
raise RuntimeError(
f"The PubMed Workflow failed without the retrieval of an initial eSearch response: {e}"
)
logger.info(f"{e} Halting the PubMed eFetch step and returning the processed eSearch response...")
return WorkflowResult(history=self._history, result=self._history[0].result)
def _create_workflow_result(self, result: Optional[ProcessedResponse | ErrorResponse] = None) -> WorkflowResult:
"""Updates the metadata field of the PubMed eFetch search result with eSearch metadata if available.
This method overrides the base implementation to handle PubMed's two-step workflow where:
1. The eSearch step (history[-2]) retrieves article IDs and returns metadata
2. The eFetch step (history[-1]) retrieves full article data but typically has empty metadata
By copying metadata from eSearch to the final eFetch result, we ensure that important information like
ID lists, query details, and pagination info are preserved in the final workflow result. This maintains
consistency with user expectations and allows downstream consumers to access complete search context.
Args:
result: Optional result to use instead of the last step's result from history
Returns:
WorkflowResult: The workflow result containing eSearch metadata and records from the initial eFetch step.
"""
result = self._history[-1].result if result is None and self._history else result
# Otherwise, replace the empty metadata field with that of the initial search where possible
esearch_ctx = self._history[-2] if len(self._history) >= 2 else None
efetch_ctx = self._history[-1] if esearch_ctx else None
if (
isinstance(esearch_ctx, StepContext)
and isinstance(efetch_ctx, StepContext)
and isinstance(esearch_ctx.step, PubMedSearchStep)
and isinstance(efetch_ctx.step, PubMedFetchStep)
and isinstance(result, ProcessedResponse)
and isinstance(esearch_ctx.result, ProcessedResponse)
and esearch_ctx.result.metadata
and not result.metadata
):
# PubMedFetchStep generally does not have metadata
result.metadata = esearch_ctx.result.metadata
return WorkflowResult(history=self._history, result=result)
__all__ = ["PubMedSearchStep", "PubMedFetchStep", "PubMedSearchWorkflow"]