From f99e49974a423ea72fef4882e4d1e3a40351ddd4 Mon Sep 17 00:00:00 2001 From: Sosokker Date: Mon, 12 May 2025 23:21:25 +0700 Subject: [PATCH] feat: add pipeline result data endpoint --- pipeline/models/pipeline.py | 5 ++- pipeline/routers/pipelines.py | 29 +++++++++++++++++ pipeline/services/pipeline_service.py | 46 +++++++++++++++++++++++---- 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/pipeline/models/pipeline.py b/pipeline/models/pipeline.py index d10709e..235f9c2 100644 --- a/pipeline/models/pipeline.py +++ b/pipeline/models/pipeline.py @@ -3,7 +3,7 @@ import enum from uuid import UUID from pydantic import BaseModel, Field -from models.ingestion import IngestorInput +from models.ingestion import IngestorInput, OutputData class PipelineStatus(str, enum.Enum): @@ -33,6 +33,9 @@ class Pipeline(BaseModel): status: PipelineStatus = Field(default=PipelineStatus.INACTIVE) created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + latest_run_output: OutputData | None = Field( + default=None, description="Output of the last successful run" + ) class PipelineCreate(BaseModel): diff --git a/pipeline/routers/pipelines.py b/pipeline/routers/pipelines.py index 8f07599..fdc45d5 100644 --- a/pipeline/routers/pipelines.py +++ b/pipeline/routers/pipelines.py @@ -9,6 +9,7 @@ from fastapi import ( ) from models.pipeline import Pipeline, PipelineCreate, PipelineStatus +from models.ingestion import OutputData from services.pipeline_service import PipelineService from dependencies import ( get_pipeline_service, @@ -198,3 +199,31 @@ async def run_pipeline_manually( background_tasks.add_task(service.run_pipeline, pipeline_id=pipeline_id) return {"detail": f"Pipeline run triggered for {pipeline_id}"} + + +@router.get( + "/{pipeline_id}/results", + response_model=OutputData | None, + summary="Get latest run results for a pipeline", + description="Retrieves the aggregated data output from the last successful run of the specified pipeline.", +) +async def get_pipeline_results( + pipeline_id: UUID, + service: PipelineService = Depends(get_pipeline_service), +) -> OutputData | None: + """ + Fetches the results of the last successful run for the given pipeline_id. + Returns null or an empty structure if no successful run with output is found. + """ + results = await service.get_pipeline_latest_results(pipeline_id) + + if results is None: + pipeline_exists = await service.get_pipeline(pipeline_id) + if not pipeline_exists: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with id {pipeline_id} not found.", + ) + return None + + return results diff --git a/pipeline/services/pipeline_service.py b/pipeline/services/pipeline_service.py index 92040dc..0edecb1 100644 --- a/pipeline/services/pipeline_service.py +++ b/pipeline/services/pipeline_service.py @@ -17,7 +17,7 @@ from models.pipeline import ( RunFrequency, PipelineStatus, ) -from models.ingestion import IngestorInput +from models.ingestion import IngestorInput, OutputData from stores.base import PipelineStore from scheduler.utils import calculate_next_run, UTC @@ -279,10 +279,13 @@ class PipelineService: # --- Execute Pipeline Logic --- run_successful = False + ingestion_output: OutputData | None = None try: logger.info("Executing core logic...") # This call and anything within it will inherit the pipeline_id context - await self._execute_ingestion(pipeline.config.ingestor_config) + ingestion_output = await self._execute_ingestion( + pipeline.config.ingestor_config + ) logger.info("Core logic finished successfully.") run_successful = True except Exception as e: @@ -317,6 +320,15 @@ class PipelineService: if run_successful: final_pipeline_state.config.last_run = now + if ingestion_output: + final_pipeline_state.latest_run_output = ingestion_output + else: + logger.warning( + "Run was successful but no ingestion output captured." + ) + final_pipeline_state.latest_run_output = None + else: + logger.warning("Run failed.") current_last_run = final_pipeline_state.config.last_run final_pipeline_state.config.next_run = calculate_next_run( @@ -346,17 +358,16 @@ class PipelineService: ) # Pipeline might be left ACTIVE or FAILED state might not be saved. Needs monitoring. - async def _execute_ingestion(self, config: IngestorInput): + async def _execute_ingestion(self, config: IngestorInput) -> OutputData | None: """ Executes the ingestion process for a pipeline using the provided IngestorInput config. Returns the ingestion results or raises an exception on failure. """ try: - # from ..ingestion import Ingestor logger.info(f"Executing ingestion with config: {config}") - results = Ingestor.run(config.sources) + results: OutputData = Ingestor.run(config.sources) logger.info( - f"Ingestion completed successfully. Results count: {len(results.records)}" + f"Ingestion completed successfully. Records count: {len(results.records)}" ) return results except ImportError: @@ -364,4 +375,25 @@ class PipelineService: raise RuntimeError("Ingestion module not found") except Exception as e: logger.error(f"Ingestion execution failed: {e}", exc_info=True) - raise + raise + + async def get_pipeline_latest_results( + self, pipeline_id: UUID + ) -> Optional[OutputData]: + """Retrieves the output from the latest successful run of a pipeline.""" + logger.debug(f"Getting latest results for pipeline: id={pipeline_id}") + pipeline = await self.store.get(pipeline_id) + if pipeline: + if pipeline.latest_run_output and pipeline.config.last_run: + # NOTE: can use PipelineRunResult + return pipeline.latest_run_output + elif pipeline.config.last_run: + logger.info( + f"Pipeline {pipeline_id} ran at {pipeline.config.last_run} but has no stored output (or run failed)." + ) + return None + else: + logger.info(f"Pipeline {pipeline_id} has no recorded run or output.") + return None + logger.warning(f"Pipeline {pipeline_id} not found when retrieving results.") + return None