From fdef21443dbaea7696c2be757af59bb3fb89b991 Mon Sep 17 00:00:00 2001 From: Sosokker Date: Mon, 12 May 2025 21:36:41 +0700 Subject: [PATCH] feat: add new endpoint --- pipeline/dependencies.py | 8 ++ pipeline/main.py | 185 +++++++++++-------------------- pipeline/routers/pipelines.py | 200 ++++++++++++++++++++++++++++++++++ 3 files changed, 269 insertions(+), 124 deletions(-) create mode 100644 pipeline/dependencies.py create mode 100644 pipeline/routers/pipelines.py diff --git a/pipeline/dependencies.py b/pipeline/dependencies.py new file mode 100644 index 0000000..647b128 --- /dev/null +++ b/pipeline/dependencies.py @@ -0,0 +1,8 @@ +from fastapi import Request + + +async def get_pipeline_service(request: Request): + service = request.app.state.pipeline_service + if not service: + raise Exception("PipelineService not initialized or available in app state.") + return service diff --git a/pipeline/main.py b/pipeline/main.py index 0b38bac..6fd5668 100644 --- a/pipeline/main.py +++ b/pipeline/main.py @@ -1,139 +1,76 @@ -""" -FastAPI service for managing and running data integration pipelines. -""" - -from typing import List, Dict, Any -from uuid import UUID - -from fastapi import FastAPI, HTTPException, BackgroundTasks - +from fastapi import FastAPI +from contextlib import asynccontextmanager import platform import asyncio +from loguru import logger -# set this so crawl4ai can work in windows +from stores.memory import InMemoryPipelineStore +from stores.base import PipelineStore +from services.pipeline_service import PipelineService +from scheduler.manager import SchedulerManager +from routers.pipelines import router as pipelines_router + +# ! Window specific asyncio policy if platform.system() == "Windows": + logger.info("Setting WindowsProactorEventLoopPolicy for asyncio.") asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) -import models -import stores -import services +# --- Resource Initialization --- +pipeline_store: PipelineStore = InMemoryPipelineStore() +pipeline_service = PipelineService(store=pipeline_store) +scheduler_manager = SchedulerManager(pipeline_service=pipeline_service) -app = FastAPI(title="Data Integration Pipeline API") +# to avoid circular import +pipeline_service.set_scheduler_manager(scheduler_manager) -@app.post( - "/pipelines", - response_model=models.Pipeline, - status_code=201, - summary="Create a new pipeline", +# --- Lifespan Management (for startup/shutdown) --- +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info("Application startup...") + # Store instances in app state for dependency injection + app.state.pipeline_store = pipeline_store + app.state.scheduler_manager = scheduler_manager + app.state.pipeline_service = pipeline_service + + # Initialize and start the scheduler + logger.info("Initializing and starting SchedulerManager...") + + scheduler_manager.start() + logger.info("SchedulerManager started.") + + yield + + # --- Shutdown --- + logger.info("Application shutdown...") + logger.info("Shutting down SchedulerManager...") + scheduler_manager.stop() + logger.info("SchedulerManager stopped.") + logger.info("Cleanup complete.") + + +# --- FastAPI App --- +app = FastAPI( + title="Data Integration Pipeline API", + description="API for managing and running data integration pipelines.", + version="0.1.0", + lifespan=lifespan, ) -def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline: - """ - Register a new pipeline with sources configuration. - """ - return stores.create_pipeline(pipeline_in) + +# Include the pipelines router +app.include_router(pipelines_router) -@app.get( - "/pipelines", response_model=List[models.Pipeline], summary="List all pipelines" -) -def list_pipelines() -> List[models.Pipeline]: - """ - Retrieve all registered pipelines. - """ - return stores.list_pipelines() +# --- Root Endpoint (Optional) --- +@app.get("/", tags=["Root"]) +async def read_root(): + return {"message": "Welcome to the Data Integration Pipeline API"} -@app.get( - "/pipelines/{pipeline_id}", - response_model=models.Pipeline, - summary="Get a pipeline by ID", -) -def get_pipeline(pipeline_id: UUID) -> models.Pipeline: - """ - Fetch details of a specific pipeline. - """ - pipeline = stores.get_pipeline(pipeline_id) - if not pipeline: - raise HTTPException(status_code=404, detail="Pipeline not found") - return pipeline +# --- Run with Uvicorn (Example) --- +if __name__ == "__main__": + import uvicorn - -@app.post( - "/pipelines/{pipeline_id}/run", - response_model=models.Run, - status_code=201, - summary="Trigger a pipeline run", -) -def run_pipeline(pipeline_id: UUID, background_tasks: BackgroundTasks) -> models.Run: - """ - Start a new run for the given pipeline. Runs asynchronously. - """ - pipeline = stores.get_pipeline(pipeline_id) - if not pipeline: - raise HTTPException(status_code=404, detail="Pipeline not found") - - run = stores.create_run(pipeline_id) - background_tasks.add_task(services.execute_pipeline, pipeline, run.id) - return run - - -@app.get( - "/pipelines/{pipeline_id}/runs", - response_model=List[models.Run], - summary="List runs for a pipeline", -) -def list_runs(pipeline_id: UUID) -> List[models.Run]: - """ - List all runs associated with a pipeline. - """ - pipeline = stores.get_pipeline(pipeline_id) - if not pipeline: - raise HTTPException(status_code=404, detail="Pipeline not found") - - runs = stores.list_runs_for_pipeline(pipeline_id) - # Return only the Run fields (omit results/error) - return [models.Run(**r.model_dump()) for r in runs] - - -@app.get( - "/pipelines/{pipeline_id}/runs/{run_id}", - response_model=models.Run, - summary="Get run status", -) -def get_run(pipeline_id: UUID, run_id: UUID) -> models.Run: - """ - Retrieve the status of a specific run. - """ - pipeline = stores.get_pipeline(pipeline_id) - if not pipeline: - raise HTTPException(status_code=404, detail="Pipeline not found") - - run = stores.get_run(run_id) - if not run or run.pipeline_id != pipeline_id: - raise HTTPException(status_code=404, detail="Run not found") - - return models.Run(**run.model_dump()) - - -@app.get( - "/pipelines/{pipeline_id}/runs/{run_id}/results", - response_model=List[Dict[str, Any]], - summary="Get run results", -) -def get_run_results(pipeline_id: UUID, run_id: UUID) -> List[Dict[str, Any]]: - """ - Retrieve normalized results of a completed run. - """ - pipeline = stores.get_pipeline(pipeline_id) - if not pipeline: - raise HTTPException(status_code=404, detail="Pipeline not found") - - run = stores.get_run(run_id) - if not run or run.pipeline_id != pipeline_id: - raise HTTPException(status_code=404, detail="Run not found") - - if run.status != "COMPLETED": - raise HTTPException(status_code=409, detail="Run not completed or has failed") - - return run.results or [] + logger.info("Starting Uvicorn server...") + # ! use reload=True only for development + uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True, loop="asyncio") diff --git a/pipeline/routers/pipelines.py b/pipeline/routers/pipelines.py new file mode 100644 index 0000000..8f07599 --- /dev/null +++ b/pipeline/routers/pipelines.py @@ -0,0 +1,200 @@ +from typing import List, Dict +from uuid import UUID +from fastapi import ( + APIRouter, + Depends, + HTTPException, + status, + BackgroundTasks, +) + +from models.pipeline import Pipeline, PipelineCreate, PipelineStatus +from services.pipeline_service import PipelineService +from dependencies import ( + get_pipeline_service, +) + +router = APIRouter( + prefix="/pipelines", + tags=["Pipelines"], + responses={404: {"description": "Pipeline not found"}}, +) + + +@router.post( + "/", + response_model=Pipeline, + status_code=status.HTTP_201_CREATED, + summary="Create a new pipeline", + description="Creates a new pipeline configuration and schedules its first run if applicable. The pipeline starts in an INACTIVE state.", +) +async def create_pipeline( + pipeline_in: PipelineCreate, + service: PipelineService = Depends(get_pipeline_service), +) -> Pipeline: + """ + Creates a new data integration pipeline. + + - **name**: A unique name for the pipeline. + - **description**: A brief description of the pipeline's purpose. + - **config**: Configuration details including: + - **ingestor_config**: Settings for the data ingestion sources. + - **run_frequency**: How often the pipeline should run (daily, weekly, monthly). + """ + try: + # The service already handles calculating next_run and notifying scheduler + created_pipeline = await service.create_pipeline( + name=pipeline_in.name, + description=pipeline_in.description, + ingestor_config=pipeline_in.config.ingestor_config, + run_frequency=pipeline_in.config.run_frequency, + ) + return created_pipeline + except Exception as e: + # Catch potential exceptions during creation (e.g., store errors) + # Log the error ideally + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to create pipeline: {e}", + ) + + +@router.get( + "/", + response_model=List[Pipeline], + summary="List all pipelines", + description="Retrieves a list of all configured pipelines.", +) +async def list_pipelines( + service: PipelineService = Depends(get_pipeline_service), +) -> List[Pipeline]: + """ + Returns a list of all pipelines currently stored. + """ + return await service.list_pipelines() + + +@router.get( + "/{pipeline_id}", + response_model=Pipeline, + summary="Get a specific pipeline", + description="Retrieves the details of a single pipeline by its unique ID.", +) +async def get_pipeline( + pipeline_id: UUID, + service: PipelineService = Depends(get_pipeline_service), +) -> Pipeline: + """ + Fetches a pipeline by its UUID. Returns 404 if not found. + """ + pipeline = await service.get_pipeline(pipeline_id) + if pipeline is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with id {pipeline_id} not found", + ) + return pipeline + + +@router.put( + "/{pipeline_id}", + response_model=Pipeline, + summary="Update a pipeline", + description="Updates the configuration of an existing pipeline. If the run frequency changes, the schedule will be updated.", +) +async def update_pipeline( + pipeline_id: UUID, + pipeline_in: PipelineCreate, + service: PipelineService = Depends(get_pipeline_service), +) -> Pipeline: + """ + Updates an existing pipeline identified by its UUID. + + Allows modification of name, description, and configuration (including run frequency). + Returns 404 if the pipeline does not exist. + """ + updated_pipeline = await service.update_pipeline(pipeline_id, pipeline_in) + if updated_pipeline is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with id {pipeline_id} not found", + ) + return updated_pipeline + + +@router.delete( + "/{pipeline_id}", + status_code=status.HTTP_204_NO_CONTENT, + summary="Delete a pipeline", + description="Deletes a pipeline configuration and unschedules any future runs.", +) +async def delete_pipeline( + pipeline_id: UUID, + service: PipelineService = Depends(get_pipeline_service), +) -> None: + """ + Deletes a pipeline by its UUID. + + Returns 204 No Content on successful deletion. + Returns 404 if the pipeline does not exist. + """ + # Check existence first for a clearer 404 + existing_pipeline = await service.get_pipeline(pipeline_id) + if not existing_pipeline: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with id {pipeline_id} not found", + ) + + deleted = await service.delete_pipeline(pipeline_id) + # Service's delete handles scheduler notification + if not deleted: + # This might happen in a race condition or if delete fails after get passes + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with id {pipeline_id} not found or could not be deleted.", + ) + # No return body needed for 204 + + +@router.post( + "/{pipeline_id}/run", + status_code=status.HTTP_202_ACCEPTED, + response_model=Dict[str, str], + summary="Manually trigger a pipeline run", + description="Initiates an immediate run of the specified pipeline in the background. The pipeline status will be updated during and after the run.", +) +async def run_pipeline_manually( + pipeline_id: UUID, + background_tasks: BackgroundTasks, + service: PipelineService = Depends(get_pipeline_service), +) -> Dict[str, str]: + """ + Triggers a pipeline run asynchronously. + + - Checks if the pipeline exists. + - Adds the `service.run_pipeline` task to be executed in the background. + - Returns immediately with a confirmation message. + + Returns 404 if the pipeline does not exist. + The service layer handles checks for already active pipelines. + """ + pipeline = await service.get_pipeline(pipeline_id) + if pipeline is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Pipeline with id {pipeline_id} not found, cannot trigger run.", + ) + + # Optional: Check status here for a quicker response if already active, + # although the service layer also checks this. + if pipeline.status == PipelineStatus.ACTIVE: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, # 409 Conflict is suitable here + detail=f"Pipeline {pipeline_id} is already running.", + ) + + # Add the potentially long-running task to the background + background_tasks.add_task(service.run_pipeline, pipeline_id=pipeline_id) + + return {"detail": f"Pipeline run triggered for {pipeline_id}"}