From 67f691a46ab32972d9291fa4588168dd32d03f94 Mon Sep 17 00:00:00 2001 From: Sosokker Date: Mon, 12 May 2025 02:58:38 +0700 Subject: [PATCH] refactor: remove logging stream - rewrite later --- pipeline/log/__init__.py | 0 pipeline/log/log_stream.py | 26 ------------- pipeline/log/logging_utils.py | 69 ----------------------------------- pipeline/main.py | 54 +++++++-------------------- 4 files changed, 13 insertions(+), 136 deletions(-) delete mode 100644 pipeline/log/__init__.py delete mode 100644 pipeline/log/log_stream.py delete mode 100644 pipeline/log/logging_utils.py diff --git a/pipeline/log/__init__.py b/pipeline/log/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipeline/log/log_stream.py b/pipeline/log/log_stream.py deleted file mode 100644 index 8de4f4d..0000000 --- a/pipeline/log/log_stream.py +++ /dev/null @@ -1,26 +0,0 @@ -from fastapi import APIRouter, Request, HTTPException -from fastapi.responses import StreamingResponse -from log.logging_utils import RUN_LOG_QUEUES -from queue import Empty -import asyncio - -router = APIRouter() - -@router.get("/pipelines/{pipeline_id}/runs/{run_id}/logs/stream") -async def stream_logs(request: Request, pipeline_id: str, run_id: str): - log_queue = RUN_LOG_QUEUES.get(run_id) - if not log_queue: - raise HTTPException(status_code=404, detail="No logs for this run.") - - async def event_generator(): - while True: - if await request.is_disconnected(): - break - try: - log_line = log_queue.get(timeout=1) - yield f"data: {log_line}\n\n" - except Empty: - await asyncio.sleep(0.2) - continue - - return StreamingResponse(event_generator(), media_type="text/event-stream") diff --git a/pipeline/log/logging_utils.py b/pipeline/log/logging_utils.py deleted file mode 100644 index 0524ef1..0000000 --- a/pipeline/log/logging_utils.py +++ /dev/null @@ -1,69 +0,0 @@ -from loguru import logger -from queue import Queue -from typing import Dict, Optional -import json - -# Per-run log queues (thread-safe) -RUN_LOG_QUEUES: Dict[str, Queue] = {} -RUN_LOG_HANDLERS: Dict[str, int] = {} - -# Structured log format -def make_log_record(level: str, message: str, pipeline_id: Optional[str], run_id: Optional[str], status: Optional[str] = None, error: Optional[str] = None, extra: Optional[dict] = None) -> dict: - record = { - "level": level, - "message": message, - "pipeline_id": pipeline_id, - "run_id": run_id, - "status": status, - "error": error, - "extra": extra or {}, - } - return record - -# Custom loguru sink for per-run logging -def log_sink(message): - record = message.record - run_id = record["extra"].get("run_id") - pipeline_id = record["extra"].get("pipeline_id") - if run_id and run_id in RUN_LOG_QUEUES: - # Structure the log as JSON for frontend parsing - log_entry = make_log_record( - level=record["level"].name, - message=record["message"], - pipeline_id=pipeline_id, - run_id=run_id, - status=record["extra"].get("status"), - error=record["extra"].get("error"), - extra=record["extra"] - ) - RUN_LOG_QUEUES[run_id].put(json.dumps(log_entry)) - -# Setup per-run logging sink -def setup_run_logging(pipeline_id: str, run_id: str): - log_queue = Queue() - RUN_LOG_QUEUES[run_id] = log_queue - handler_id = logger.add( - log_sink, - filter=lambda record: record["extra"].get("run_id") == run_id, - enqueue=True - ) - RUN_LOG_HANDLERS[run_id] = handler_id - return log_queue - -# Remove per-run logging sink and clean up -def cleanup_run_logging(run_id: str): - if run_id in RUN_LOG_HANDLERS: - logger.remove(RUN_LOG_HANDLERS[run_id]) - del RUN_LOG_HANDLERS[run_id] - if run_id in RUN_LOG_QUEUES: - del RUN_LOG_QUEUES[run_id] - -# Helper for logging with context -def pipeline_log(level: str, message: str, pipeline_id: str, run_id: str, status: Optional[str] = None, error: Optional[str] = None, extra: Optional[dict] = None): - logger.log(level, message, extra={"pipeline_id": pipeline_id, "run_id": run_id, "status": status, "error": error, **(extra or {})}) - -# Example usage: -# setup_run_logging(pipeline_id, run_id) -# pipeline_log("INFO", "Pipeline started", pipeline_id, run_id, status="RUNNING") -# pipeline_log("ERROR", "Pipeline failed", pipeline_id, run_id, status="FAILED", error="Some error") -# cleanup_run_logging(run_id) diff --git a/pipeline/main.py b/pipeline/main.py index e69f8c8..0b38bac 100644 --- a/pipeline/main.py +++ b/pipeline/main.py @@ -10,23 +10,22 @@ from fastapi import FastAPI, HTTPException, BackgroundTasks import platform import asyncio +# set this so crawl4ai can work in windows if platform.system() == "Windows": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) import models import stores import services -from log.log_stream import router as log_stream_router app = FastAPI(title="Data Integration Pipeline API") -app.include_router(log_stream_router) @app.post( "/pipelines", response_model=models.Pipeline, status_code=201, - summary="Create a new pipeline" + summary="Create a new pipeline", ) def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline: """ @@ -36,9 +35,7 @@ def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline: @app.get( - "/pipelines", - response_model=List[models.Pipeline], - summary="List all pipelines" + "/pipelines", response_model=List[models.Pipeline], summary="List all pipelines" ) def list_pipelines() -> List[models.Pipeline]: """ @@ -50,7 +47,7 @@ def list_pipelines() -> List[models.Pipeline]: @app.get( "/pipelines/{pipeline_id}", response_model=models.Pipeline, - summary="Get a pipeline by ID" + summary="Get a pipeline by ID", ) def get_pipeline(pipeline_id: UUID) -> models.Pipeline: """ @@ -66,12 +63,9 @@ def get_pipeline(pipeline_id: UUID) -> models.Pipeline: "/pipelines/{pipeline_id}/run", response_model=models.Run, status_code=201, - summary="Trigger a pipeline run" + summary="Trigger a pipeline run", ) -def run_pipeline( - pipeline_id: UUID, - background_tasks: BackgroundTasks -) -> models.Run: +def run_pipeline(pipeline_id: UUID, background_tasks: BackgroundTasks) -> models.Run: """ Start a new run for the given pipeline. Runs asynchronously. """ @@ -87,7 +81,7 @@ def run_pipeline( @app.get( "/pipelines/{pipeline_id}/runs", response_model=List[models.Run], - summary="List runs for a pipeline" + summary="List runs for a pipeline", ) def list_runs(pipeline_id: UUID) -> List[models.Run]: """ @@ -99,13 +93,13 @@ def list_runs(pipeline_id: UUID) -> List[models.Run]: runs = stores.list_runs_for_pipeline(pipeline_id) # Return only the Run fields (omit results/error) - return [models.Run(**r.dict()) for r in runs] + return [models.Run(**r.model_dump()) for r in runs] @app.get( "/pipelines/{pipeline_id}/runs/{run_id}", response_model=models.Run, - summary="Get run status" + summary="Get run status", ) def get_run(pipeline_id: UUID, run_id: UUID) -> models.Run: """ @@ -119,13 +113,13 @@ def get_run(pipeline_id: UUID, run_id: UUID) -> models.Run: if not run or run.pipeline_id != pipeline_id: raise HTTPException(status_code=404, detail="Run not found") - return models.Run(**run.dict()) + return models.Run(**run.model_dump()) @app.get( "/pipelines/{pipeline_id}/runs/{run_id}/results", response_model=List[Dict[str, Any]], - summary="Get run results" + summary="Get run results", ) def get_run_results(pipeline_id: UUID, run_id: UUID) -> List[Dict[str, Any]]: """ @@ -139,29 +133,7 @@ def get_run_results(pipeline_id: UUID, run_id: UUID) -> List[Dict[str, Any]]: if not run or run.pipeline_id != pipeline_id: raise HTTPException(status_code=404, detail="Run not found") - if run.status != 'COMPLETED': - raise HTTPException( - status_code=409, - detail="Run not completed or has failed" - ) + if run.status != "COMPLETED": + raise HTTPException(status_code=409, detail="Run not completed or has failed") return run.results or [] - - -# Dedicated endpoint to retrieve the error message for a failed run -@app.get( - "/pipelines/{pipeline_id}/runs/{run_id}/error", - response_model=str, - summary="Get run error message" -) -def get_run_error(pipeline_id: UUID, run_id: UUID) -> str: - """ - Retrieve the error message for a run that failed. - """ - pipeline = stores.get_pipeline(pipeline_id) - if not pipeline: - raise HTTPException(status_code=404, detail="Pipeline not found") - run = stores.get_run(run_id) - if not run or run.pipeline_id != pipeline_id: - raise HTTPException(status_code=404, detail="Run not found") - return run.error or "" \ No newline at end of file