refactor: remove logging stream - rewrite later

This commit is contained in:
Sosokker 2025-05-12 02:58:38 +07:00
parent c91526623c
commit 67f691a46a
4 changed files with 13 additions and 136 deletions

View File

@ -1,26 +0,0 @@
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import StreamingResponse
from log.logging_utils import RUN_LOG_QUEUES
from queue import Empty
import asyncio
router = APIRouter()
@router.get("/pipelines/{pipeline_id}/runs/{run_id}/logs/stream")
async def stream_logs(request: Request, pipeline_id: str, run_id: str):
log_queue = RUN_LOG_QUEUES.get(run_id)
if not log_queue:
raise HTTPException(status_code=404, detail="No logs for this run.")
async def event_generator():
while True:
if await request.is_disconnected():
break
try:
log_line = log_queue.get(timeout=1)
yield f"data: {log_line}\n\n"
except Empty:
await asyncio.sleep(0.2)
continue
return StreamingResponse(event_generator(), media_type="text/event-stream")

View File

@ -1,69 +0,0 @@
from loguru import logger
from queue import Queue
from typing import Dict, Optional
import json
# Per-run log queues (thread-safe)
RUN_LOG_QUEUES: Dict[str, Queue] = {}
RUN_LOG_HANDLERS: Dict[str, int] = {}
# Structured log format
def make_log_record(level: str, message: str, pipeline_id: Optional[str], run_id: Optional[str], status: Optional[str] = None, error: Optional[str] = None, extra: Optional[dict] = None) -> dict:
record = {
"level": level,
"message": message,
"pipeline_id": pipeline_id,
"run_id": run_id,
"status": status,
"error": error,
"extra": extra or {},
}
return record
# Custom loguru sink for per-run logging
def log_sink(message):
record = message.record
run_id = record["extra"].get("run_id")
pipeline_id = record["extra"].get("pipeline_id")
if run_id and run_id in RUN_LOG_QUEUES:
# Structure the log as JSON for frontend parsing
log_entry = make_log_record(
level=record["level"].name,
message=record["message"],
pipeline_id=pipeline_id,
run_id=run_id,
status=record["extra"].get("status"),
error=record["extra"].get("error"),
extra=record["extra"]
)
RUN_LOG_QUEUES[run_id].put(json.dumps(log_entry))
# Setup per-run logging sink
def setup_run_logging(pipeline_id: str, run_id: str):
log_queue = Queue()
RUN_LOG_QUEUES[run_id] = log_queue
handler_id = logger.add(
log_sink,
filter=lambda record: record["extra"].get("run_id") == run_id,
enqueue=True
)
RUN_LOG_HANDLERS[run_id] = handler_id
return log_queue
# Remove per-run logging sink and clean up
def cleanup_run_logging(run_id: str):
if run_id in RUN_LOG_HANDLERS:
logger.remove(RUN_LOG_HANDLERS[run_id])
del RUN_LOG_HANDLERS[run_id]
if run_id in RUN_LOG_QUEUES:
del RUN_LOG_QUEUES[run_id]
# Helper for logging with context
def pipeline_log(level: str, message: str, pipeline_id: str, run_id: str, status: Optional[str] = None, error: Optional[str] = None, extra: Optional[dict] = None):
logger.log(level, message, extra={"pipeline_id": pipeline_id, "run_id": run_id, "status": status, "error": error, **(extra or {})})
# Example usage:
# setup_run_logging(pipeline_id, run_id)
# pipeline_log("INFO", "Pipeline started", pipeline_id, run_id, status="RUNNING")
# pipeline_log("ERROR", "Pipeline failed", pipeline_id, run_id, status="FAILED", error="Some error")
# cleanup_run_logging(run_id)

View File

@ -10,23 +10,22 @@ from fastapi import FastAPI, HTTPException, BackgroundTasks
import platform
import asyncio
# set this so crawl4ai can work in windows
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
import models
import stores
import services
from log.log_stream import router as log_stream_router
app = FastAPI(title="Data Integration Pipeline API")
app.include_router(log_stream_router)
@app.post(
"/pipelines",
response_model=models.Pipeline,
status_code=201,
summary="Create a new pipeline"
summary="Create a new pipeline",
)
def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline:
"""
@ -36,9 +35,7 @@ def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline:
@app.get(
"/pipelines",
response_model=List[models.Pipeline],
summary="List all pipelines"
"/pipelines", response_model=List[models.Pipeline], summary="List all pipelines"
)
def list_pipelines() -> List[models.Pipeline]:
"""
@ -50,7 +47,7 @@ def list_pipelines() -> List[models.Pipeline]:
@app.get(
"/pipelines/{pipeline_id}",
response_model=models.Pipeline,
summary="Get a pipeline by ID"
summary="Get a pipeline by ID",
)
def get_pipeline(pipeline_id: UUID) -> models.Pipeline:
"""
@ -66,12 +63,9 @@ def get_pipeline(pipeline_id: UUID) -> models.Pipeline:
"/pipelines/{pipeline_id}/run",
response_model=models.Run,
status_code=201,
summary="Trigger a pipeline run"
summary="Trigger a pipeline run",
)
def run_pipeline(
pipeline_id: UUID,
background_tasks: BackgroundTasks
) -> models.Run:
def run_pipeline(pipeline_id: UUID, background_tasks: BackgroundTasks) -> models.Run:
"""
Start a new run for the given pipeline. Runs asynchronously.
"""
@ -87,7 +81,7 @@ def run_pipeline(
@app.get(
"/pipelines/{pipeline_id}/runs",
response_model=List[models.Run],
summary="List runs for a pipeline"
summary="List runs for a pipeline",
)
def list_runs(pipeline_id: UUID) -> List[models.Run]:
"""
@ -99,13 +93,13 @@ def list_runs(pipeline_id: UUID) -> List[models.Run]:
runs = stores.list_runs_for_pipeline(pipeline_id)
# Return only the Run fields (omit results/error)
return [models.Run(**r.dict()) for r in runs]
return [models.Run(**r.model_dump()) for r in runs]
@app.get(
"/pipelines/{pipeline_id}/runs/{run_id}",
response_model=models.Run,
summary="Get run status"
summary="Get run status",
)
def get_run(pipeline_id: UUID, run_id: UUID) -> models.Run:
"""
@ -119,13 +113,13 @@ def get_run(pipeline_id: UUID, run_id: UUID) -> models.Run:
if not run or run.pipeline_id != pipeline_id:
raise HTTPException(status_code=404, detail="Run not found")
return models.Run(**run.dict())
return models.Run(**run.model_dump())
@app.get(
"/pipelines/{pipeline_id}/runs/{run_id}/results",
response_model=List[Dict[str, Any]],
summary="Get run results"
summary="Get run results",
)
def get_run_results(pipeline_id: UUID, run_id: UUID) -> List[Dict[str, Any]]:
"""
@ -139,29 +133,7 @@ def get_run_results(pipeline_id: UUID, run_id: UUID) -> List[Dict[str, Any]]:
if not run or run.pipeline_id != pipeline_id:
raise HTTPException(status_code=404, detail="Run not found")
if run.status != 'COMPLETED':
raise HTTPException(
status_code=409,
detail="Run not completed or has failed"
)
if run.status != "COMPLETED":
raise HTTPException(status_code=409, detail="Run not completed or has failed")
return run.results or []
# Dedicated endpoint to retrieve the error message for a failed run
@app.get(
"/pipelines/{pipeline_id}/runs/{run_id}/error",
response_model=str,
summary="Get run error message"
)
def get_run_error(pipeline_id: UUID, run_id: UUID) -> str:
"""
Retrieve the error message for a run that failed.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
run = stores.get_run(run_id)
if not run or run.pipeline_id != pipeline_id:
raise HTTPException(status_code=404, detail="Run not found")
return run.error or ""