diff --git a/data/sample.json b/data/sample.json new file mode 100644 index 0000000..d729b76 --- /dev/null +++ b/data/sample.json @@ -0,0 +1,38 @@ +{ + "quiz": { + "sport": { + "q1": { + "question": "Which one is correct team name in NBA?", + "options": [ + "New York Bulls", + "Los Angeles Kings", + "Golden State Warriros", + "Huston Rocket" + ], + "answer": "Huston Rocket" + } + }, + "maths": { + "q1": { + "question": "5 + 7 = ?", + "options": [ + "10", + "11", + "12", + "13" + ], + "answer": "12" + }, + "q2": { + "question": "12 - 8 = ?", + "options": [ + "1", + "2", + "3", + "4" + ], + "answer": "4" + } + } + } +} \ No newline at end of file diff --git a/ingestion/adapters/api_adapter.py b/ingestion/adapters/api_adapter.py index 39bcd93..0f9ef24 100644 --- a/ingestion/adapters/api_adapter.py +++ b/ingestion/adapters/api_adapter.py @@ -9,6 +9,7 @@ from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from .base import DataSourceAdapter +from loguru import logger class ApiAdapter(DataSourceAdapter): @@ -20,7 +21,8 @@ class ApiAdapter(DataSourceAdapter): self, url: str, headers: Optional[Dict[str, str]] = None, - timeout: float = 30 + timeout: float = 30, + token: Optional[str] = None, ): """ Initialize the API adapter. @@ -29,10 +31,14 @@ class ApiAdapter(DataSourceAdapter): url: Endpoint URL to fetch. headers: Optional HTTP headers. timeout: Timeout in seconds for the request. + token: Optional bearer token for Authorization header. """ self.url = url self.headers = headers or {} + if token: + self.headers["Authorization"] = f"Bearer {token}" self.timeout = timeout + logger.info(f"Initializing ApiAdapter for URL: {url}") self.session = self._init_session() def _init_session(self) -> requests.Session: @@ -49,6 +55,7 @@ class ApiAdapter(DataSourceAdapter): adapter = HTTPAdapter(max_retries=retries) session.mount("https://", adapter) session.mount("http://", adapter) + logger.debug("HTTP session initialized with retry strategy.") return session def fetch(self) -> List[Dict[str, Any]]: @@ -61,21 +68,27 @@ class ApiAdapter(DataSourceAdapter): Raises: RuntimeError: On network error, HTTP error, or JSON parse error. """ + logger.info(f"Fetching data from API: {self.url}") try: response = self.session.get( self.url, headers=self.headers, timeout=self.timeout ) response.raise_for_status() + logger.debug(f"Received response with status code: {response.status_code}") except requests.exceptions.RequestException as e: + logger.error(f"API request failed: {e}") raise RuntimeError(f"API request failed: {e}") try: data = response.json() + logger.debug(f"Successfully parsed JSON response from {self.url}") except ValueError as e: + logger.error(f"Failed to parse JSON response: {e}") raise RuntimeError(f"Failed to parse JSON response: {e}") if isinstance(data, list): return data if isinstance(data, dict): return [data] + logger.error("Unexpected JSON structure: expected list or dict.") raise RuntimeError("Unexpected JSON structure: expected list or dict.") \ No newline at end of file diff --git a/ingestion/adapters/file_adapter.py b/ingestion/adapters/file_adapter.py index e27548d..04ee228 100644 --- a/ingestion/adapters/file_adapter.py +++ b/ingestion/adapters/file_adapter.py @@ -6,27 +6,36 @@ from typing import List, Dict, Any import json import pandas as pd +from loguru import logger from .base import DataSourceAdapter class FileAdapter(DataSourceAdapter): """ - Adapter for reading data from local files (CSV or JSON). + Adapter for reading data from local files (CSV or JSON), or from uploaded file-like objects. """ - def __init__(self, path: str): + def __init__(self, path: str = None, format: str = None, upload=None, upload_filename: str = None): """ Initialize the file adapter. Args: - path: Path to the input file (.csv or .json). + path: Path to the input file (.csv or .json), optional if upload is provided. + format: Optional file format (e.g., 'csv', 'json'). + upload: Optional file-like object (e.g., from upload). + upload_filename: Optional original filename for validation/logging. """ self.path = path + self.format = format + self.upload = upload + self.upload_filename = upload_filename + logger.info(f"Initialized FileAdapter for path: {path}, upload: {upload_filename}, format: {format}") def fetch(self) -> List[Dict[str, Any]]: """ Read and parse the file, returning a list of records. + Supports both path-based and uploaded file-like inputs. Returns: List of dicts from the file contents. @@ -35,26 +44,68 @@ class FileAdapter(DataSourceAdapter): RuntimeError: On read or parse errors. ValueError: If file extension is unsupported. """ - p = self.path.lower() + if self.upload is not None: + # Handle uploaded file-like object + logger.info(f"Fetching data from uploaded file: {self.upload_filename or '[no filename]'}") + if self.format == "csv" or (self.upload_filename and self.upload_filename.lower().endswith(".csv")): + try: + self.upload.seek(0) + df = pd.read_csv(self.upload) + logger.debug(f"Successfully read uploaded CSV file: {self.upload_filename}") + return df.to_dict(orient="records") + except Exception as e: + logger.error(f"Failed to read uploaded CSV '{self.upload_filename}': {e}") + raise RuntimeError(f"Failed to read uploaded CSV '{self.upload_filename}': {e}") + elif self.format == "json" or (self.upload_filename and self.upload_filename.lower().endswith(".json")): + try: + self.upload.seek(0) + data = json.load(self.upload) + logger.debug(f"Successfully read uploaded JSON file: {self.upload_filename}") + if isinstance(data, list): + return data + if isinstance(data, dict): + return [data] + logger.error(f"Uploaded JSON file '{self.upload_filename}' does not contain a list or dict.") + raise RuntimeError( + f"Uploaded JSON file '{self.upload_filename}' does not contain a list or dict." + ) + except Exception as e: + logger.error(f"Failed to read uploaded JSON '{self.upload_filename}': {e}") + raise RuntimeError(f"Failed to read uploaded JSON '{self.upload_filename}': {e}") + else: + logger.error(f"Unsupported uploaded file extension for '{self.upload_filename}'. Only .csv and .json are supported.") + raise ValueError( + f"Unsupported uploaded file extension for '{self.upload_filename}'. " + "Only .csv and .json are supported." + ) + # Fallback to path-based loading + p = (self.path or "").lower() + logger.info(f"Attempting to fetch data from file: {self.path}") if p.endswith(".csv"): try: df = pd.read_csv(self.path) + logger.debug(f"Successfully read CSV file: {self.path}") return df.to_dict(orient="records") except Exception as e: + logger.error(f"Failed to read CSV '{self.path}': {e}") raise RuntimeError(f"Failed to read CSV '{self.path}': {e}") if p.endswith(".json"): try: with open(self.path, "r", encoding="utf-8") as f: data = json.load(f) + logger.debug(f"Successfully read JSON file: {self.path}") if isinstance(data, list): return data if isinstance(data, dict): return [data] + logger.error(f"JSON file '{self.path}' does not contain a list or dict.") raise RuntimeError( f"JSON file '{self.path}' does not contain a list or dict." ) except Exception as e: + logger.error(f"Failed to read JSON '{self.path}': {e}") raise RuntimeError(f"Failed to read JSON '{self.path}': {e}") + logger.error(f"Unsupported file extension for '{self.path}'. Only .csv and .json are supported.") raise ValueError( f"Unsupported file extension for '{self.path}'. " "Only .csv and .json are supported." diff --git a/ingestion/adapters/web_scraper_adapter.py b/ingestion/adapters/web_scraper_adapter.py index 8b92100..1912413 100644 --- a/ingestion/adapters/web_scraper_adapter.py +++ b/ingestion/adapters/web_scraper_adapter.py @@ -21,6 +21,7 @@ from crawl4ai.extraction_strategy import ( ) from .base import DataSourceAdapter +from loguru import logger class WebScraperAdapter(DataSourceAdapter): @@ -60,6 +61,7 @@ class WebScraperAdapter(DataSourceAdapter): self.output_format = output_format self.verbose = verbose self.cache_mode = cache_mode + logger.info(f"Initialized WebScraperAdapter for URLs: {urls}") def fetch(self) -> List[Dict[str, Any]]: """ @@ -71,15 +73,18 @@ class WebScraperAdapter(DataSourceAdapter): Raises: RuntimeError: On failure during crawling or extraction. """ + logger.info("Starting synchronous fetch for web scraping.") try: return asyncio.run(self._fetch_async()) except Exception as e: + logger.error(f"Web scraping failed: {e}") raise RuntimeError(f"Web scraping failed: {e}") async def _fetch_async(self) -> List[Dict[str, Any]]: """ Internal async method to perform crawling and extraction. """ + logger.info("Starting async web scraping fetch.") # Initialize crawler browser_cfg = BrowserConfig(headless=True, verbose=self.verbose) crawler = AsyncWebCrawler(config=browser_cfg) @@ -96,7 +101,9 @@ class WebScraperAdapter(DataSourceAdapter): extraction_strategy = JsonCssExtractionStrategy( schema=schema, verbose=self.verbose ) + logger.debug(f"Loaded schema file: {self.schema_file}") except Exception as e: + logger.error(f"Failed to load schema file '{self.schema_file}': {e}") await crawler.close() raise RuntimeError( f"Failed to load schema file '{self.schema_file}': {e}" @@ -109,7 +116,9 @@ class WebScraperAdapter(DataSourceAdapter): apply_chunking=True, verbose=self.verbose, ) + logger.debug("Using LLM extraction strategy.") else: + logger.error("Either 'schema_file' or 'prompt' must be provided.") await crawler.close() raise ValueError("Either 'schema_file' or 'prompt' must be provided.") @@ -117,6 +126,7 @@ class WebScraperAdapter(DataSourceAdapter): try: cache_enum = getattr(CacheMode, self.cache_mode.upper()) except AttributeError: + logger.warning(f"Invalid cache mode '{self.cache_mode}', defaulting to ENABLED.") cache_enum = CacheMode.ENABLED run_cfg = CrawlerRunConfig( @@ -127,9 +137,11 @@ class WebScraperAdapter(DataSourceAdapter): # Execute crawl try: + logger.info(f"Crawling URLs: {self.urls}") results: List[CrawlResult] = await crawler.arun_many( urls=self.urls, config=run_cfg ) + logger.debug(f"Crawling completed. Results: {results}") finally: await crawler.close() @@ -137,10 +149,16 @@ class WebScraperAdapter(DataSourceAdapter): records: List[Dict[str, Any]] = [] for res in results: if not res.success or not res.extracted_content: + logger.warning(f"Skipping failed or empty result for URL: {getattr(res, 'url', None)}") continue try: content = json.loads(res.extracted_content) + logger.debug(f"Parsed extracted content for URL: {res.url}") except Exception: + logger.error(f"Failed to parse extracted content for URL: {res.url}") + continue + if content is None: + logger.warning(f"Extracted content is None for URL: {res.url}") continue if isinstance(content, list): for item in content: @@ -150,5 +168,8 @@ class WebScraperAdapter(DataSourceAdapter): elif isinstance(content, dict): content["source_url"] = res.url records.append(content) + else: + logger.warning(f"Extracted content for URL {res.url} is not a list or dict: {type(content)}") + logger.info(f"Web scraping completed. Extracted {len(records)} records.") return records \ No newline at end of file diff --git a/ingestion/ingestor.py b/ingestion/ingestor.py index 4114c62..bc151cb 100644 --- a/ingestion/ingestor.py +++ b/ingestion/ingestor.py @@ -7,6 +7,8 @@ from typing import List, Dict, Any from ingestion.adapters.api_adapter import ApiAdapter from ingestion.adapters.file_adapter import FileAdapter from ingestion.adapters.web_scraper_adapter import WebScraperAdapter +from pydantic import BaseModel +from loguru import logger class Ingestor: @@ -31,11 +33,24 @@ class Ingestor: ValueError: For unknown source types. RuntimeError: If an adapter fails during fetch. """ + from log.logging_utils import pipeline_log aggregated: List[Dict[str, Any]] = [] + logger.info("Starting ingestion run for sources.") for src in sources: - src_type = src.get("type") - config = src.get("config", {}) + # accept Pydantic models or raw dicts + if isinstance(src, BaseModel): + src_item = src.dict() + else: + src_item = src + src_type = src_item.get("type") + config = src_item.get("config", {}) + # convert BaseModel config to dict if needed + if not isinstance(config, dict) and hasattr(config, "dict"): + config = config.dict(exclude_unset=True) + pipeline_id = config.get("pipeline_id") or src_item.get("pipeline_id") + run_id = config.get("run_id") or src_item.get("run_id") + logger.info(f"Processing source type: {src_type} with config: {config}") if src_type == "api": adapter = ApiAdapter(**config) elif src_type == "scrape": @@ -43,16 +58,22 @@ class Ingestor: elif src_type == "file": adapter = FileAdapter(**config) else: + logger.error(f"Unknown source type: {src_type}") + pipeline_log("ERROR", f"Unknown source type: {src_type}", pipeline_id, run_id, status="FAILED") raise ValueError(f"Unknown source type: {src_type}") try: - data = adapter.fetch() - aggregated.extend(data) + logger.info(f"Fetching records using {src_type} adapter.") + records = adapter.fetch() + logger.info(f"Fetched {len(records)} records from {src_type} source.") + aggregated.extend(records) + pipeline_log("SUCCESS", f"Fetched {len(records)} records.", pipeline_id, run_id, status="COMPLETED") except Exception as e: - raise RuntimeError( - f"Ingestion failed for source '{src_type}' with config {config}: {e}" - ) + logger.error(f"Fetch failed for source {src_type}: {e}") + pipeline_log("ERROR", f"Fetch failed: {e}", pipeline_id, run_id, status="FAILED") + raise RuntimeError(f"Fetch failed for source {src_type}: {e}") + logger.info(f"Ingestion run completed. Total records aggregated: {len(aggregated)}") return aggregated diff --git a/log/__init__.py b/log/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/log/log_stream.py b/log/log_stream.py new file mode 100644 index 0000000..8de4f4d --- /dev/null +++ b/log/log_stream.py @@ -0,0 +1,26 @@ +from fastapi import APIRouter, Request, HTTPException +from fastapi.responses import StreamingResponse +from log.logging_utils import RUN_LOG_QUEUES +from queue import Empty +import asyncio + +router = APIRouter() + +@router.get("/pipelines/{pipeline_id}/runs/{run_id}/logs/stream") +async def stream_logs(request: Request, pipeline_id: str, run_id: str): + log_queue = RUN_LOG_QUEUES.get(run_id) + if not log_queue: + raise HTTPException(status_code=404, detail="No logs for this run.") + + async def event_generator(): + while True: + if await request.is_disconnected(): + break + try: + log_line = log_queue.get(timeout=1) + yield f"data: {log_line}\n\n" + except Empty: + await asyncio.sleep(0.2) + continue + + return StreamingResponse(event_generator(), media_type="text/event-stream") diff --git a/log/logging_utils.py b/log/logging_utils.py new file mode 100644 index 0000000..0524ef1 --- /dev/null +++ b/log/logging_utils.py @@ -0,0 +1,69 @@ +from loguru import logger +from queue import Queue +from typing import Dict, Optional +import json + +# Per-run log queues (thread-safe) +RUN_LOG_QUEUES: Dict[str, Queue] = {} +RUN_LOG_HANDLERS: Dict[str, int] = {} + +# Structured log format +def make_log_record(level: str, message: str, pipeline_id: Optional[str], run_id: Optional[str], status: Optional[str] = None, error: Optional[str] = None, extra: Optional[dict] = None) -> dict: + record = { + "level": level, + "message": message, + "pipeline_id": pipeline_id, + "run_id": run_id, + "status": status, + "error": error, + "extra": extra or {}, + } + return record + +# Custom loguru sink for per-run logging +def log_sink(message): + record = message.record + run_id = record["extra"].get("run_id") + pipeline_id = record["extra"].get("pipeline_id") + if run_id and run_id in RUN_LOG_QUEUES: + # Structure the log as JSON for frontend parsing + log_entry = make_log_record( + level=record["level"].name, + message=record["message"], + pipeline_id=pipeline_id, + run_id=run_id, + status=record["extra"].get("status"), + error=record["extra"].get("error"), + extra=record["extra"] + ) + RUN_LOG_QUEUES[run_id].put(json.dumps(log_entry)) + +# Setup per-run logging sink +def setup_run_logging(pipeline_id: str, run_id: str): + log_queue = Queue() + RUN_LOG_QUEUES[run_id] = log_queue + handler_id = logger.add( + log_sink, + filter=lambda record: record["extra"].get("run_id") == run_id, + enqueue=True + ) + RUN_LOG_HANDLERS[run_id] = handler_id + return log_queue + +# Remove per-run logging sink and clean up +def cleanup_run_logging(run_id: str): + if run_id in RUN_LOG_HANDLERS: + logger.remove(RUN_LOG_HANDLERS[run_id]) + del RUN_LOG_HANDLERS[run_id] + if run_id in RUN_LOG_QUEUES: + del RUN_LOG_QUEUES[run_id] + +# Helper for logging with context +def pipeline_log(level: str, message: str, pipeline_id: str, run_id: str, status: Optional[str] = None, error: Optional[str] = None, extra: Optional[dict] = None): + logger.log(level, message, extra={"pipeline_id": pipeline_id, "run_id": run_id, "status": status, "error": error, **(extra or {})}) + +# Example usage: +# setup_run_logging(pipeline_id, run_id) +# pipeline_log("INFO", "Pipeline started", pipeline_id, run_id, status="RUNNING") +# pipeline_log("ERROR", "Pipeline failed", pipeline_id, run_id, status="FAILED", error="Some error") +# cleanup_run_logging(run_id) diff --git a/main.py b/main.py index bca5401..e69f8c8 100644 --- a/main.py +++ b/main.py @@ -7,11 +7,19 @@ from uuid import UUID from fastapi import FastAPI, HTTPException, BackgroundTasks +import platform +import asyncio + +if platform.system() == "Windows": + asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) + import models import stores import services +from log.log_stream import router as log_stream_router app = FastAPI(title="Data Integration Pipeline API") +app.include_router(log_stream_router) @app.post( @@ -137,4 +145,23 @@ def get_run_results(pipeline_id: UUID, run_id: UUID) -> List[Dict[str, Any]]: detail="Run not completed or has failed" ) - return run.results or [] \ No newline at end of file + return run.results or [] + + +# Dedicated endpoint to retrieve the error message for a failed run +@app.get( + "/pipelines/{pipeline_id}/runs/{run_id}/error", + response_model=str, + summary="Get run error message" +) +def get_run_error(pipeline_id: UUID, run_id: UUID) -> str: + """ + Retrieve the error message for a run that failed. + """ + pipeline = stores.get_pipeline(pipeline_id) + if not pipeline: + raise HTTPException(status_code=404, detail="Pipeline not found") + run = stores.get_run(run_id) + if not run or run.pipeline_id != pipeline_id: + raise HTTPException(status_code=404, detail="Run not found") + return run.error or "" \ No newline at end of file diff --git a/models.py b/models.py index 6cac555..b40fb61 100644 --- a/models.py +++ b/models.py @@ -6,7 +6,7 @@ from typing import List, Union, Annotated, Optional, Literal, Dict, Any from uuid import UUID from datetime import datetime -from pydantic import BaseModel, Field, HttpUrl, field_validator +from pydantic import BaseModel, Field, HttpUrl, field_validator, ValidationInfo class RunCreate(BaseModel): @@ -75,26 +75,54 @@ class ScrapeConfig(BaseModel): class FileConfig(BaseModel): """ - Configuration for a file-based source. + Configuration for a file-based source. Supports either a file path or an uploaded file. """ - path: str = Field( - ..., - description="Path to the input file", + path: Optional[str] = Field( + None, + description="Path to the input file (optional if upload is provided)", example="/data/myfile.json" ) + upload: Optional[Any] = Field( + None, + description="Uploaded file object or metadata (optional if path is provided)", + example=None + ) + upload_filename: Optional[str] = Field( + None, + description="Original filename of the uploaded file (for validation)", + example="myfile.json" + ) format: Literal["csv", "json", "sqlite"] = Field( "json", description="Format of the file", example="csv" ) - @field_validator("path") - def path_extension_matches_format(cls, v: str, values): - fmt = values.get("format") - if fmt and not v.lower().endswith(f".{fmt}"): - raise ValueError(f"File extension must match format '{fmt}'") + @field_validator("path", mode="before") + def require_path_or_upload(cls, v, info: ValidationInfo): + data = info.data + if not v and not data.get("upload"): + raise ValueError("Either 'path' or 'upload' must be provided.") return v + @field_validator("upload_filename", mode="before") + def filename_extension_matches_format(cls, v, info: ValidationInfo): + fmt = info.data.get("format") + if v and fmt and not v.lower().endswith(f".{fmt}"): + raise ValueError(f"Uploaded file extension must match format '{fmt}'") + return v + + @field_validator("path", mode="after") + def path_or_upload_extension_matches_format(cls, v, info: ValidationInfo): + fmt = info.data.get("format") + upload_filename = info.data.get("upload_filename") + if v and fmt and not v.lower().endswith(f".{fmt}"): + raise ValueError(f"File extension must match format '{fmt}'") + if upload_filename and fmt and not upload_filename.lower().endswith(f".{fmt}"): + raise ValueError(f"Uploaded file extension must match format '{fmt}'") + return v + + class ApiSource(BaseModel): """ diff --git a/pyproject.toml b/pyproject.toml index a2d64b7..57c76e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ dependencies = [ "crawl4ai>=0.5.0.post8", "fastapi[standard]>=0.115.12", "inquirer>=3.4.0", + "loguru>=0.7.3", "pandas>=2.2.3", "python-dotenv>=1.1.0", "rich>=14.0.0", diff --git a/schema.yaml b/schema.yaml new file mode 100644 index 0000000..63a0a68 --- /dev/null +++ b/schema.yaml @@ -0,0 +1 @@ +{"openapi":"3.1.0","info":{"title":"Data Integration Pipeline API","version":"0.1.0"},"paths":{"/pipelines":{"get":{"summary":"List all pipelines","description":"Retrieve all registered pipelines.","operationId":"list_pipelines_pipelines_get","responses":{"200":{"description":"Successful Response","content":{"application/json":{"schema":{"items":{"$ref":"#/components/schemas/Pipeline"},"type":"array","title":"Response List Pipelines Pipelines Get"}}}}}},"post":{"summary":"Create a new pipeline","description":"Register a new pipeline with sources configuration.","operationId":"create_pipeline_pipelines_post","requestBody":{"content":{"application/json":{"schema":{"$ref":"#/components/schemas/PipelineCreate"}}},"required":true},"responses":{"201":{"description":"Successful Response","content":{"application/json":{"schema":{"$ref":"#/components/schemas/Pipeline"}}}},"422":{"description":"Validation Error","content":{"application/json":{"schema":{"$ref":"#/components/schemas/HTTPValidationError"}}}}}}},"/pipelines/{pipeline_id}":{"get":{"summary":"Get a pipeline by ID","description":"Fetch details of a specific pipeline.","operationId":"get_pipeline_pipelines__pipeline_id__get","parameters":[{"name":"pipeline_id","in":"path","required":true,"schema":{"type":"string","format":"uuid","title":"Pipeline Id"}}],"responses":{"200":{"description":"Successful Response","content":{"application/json":{"schema":{"$ref":"#/components/schemas/Pipeline"}}}},"422":{"description":"Validation Error","content":{"application/json":{"schema":{"$ref":"#/components/schemas/HTTPValidationError"}}}}}}},"/pipelines/{pipeline_id}/run":{"post":{"summary":"Trigger a pipeline run","description":"Start a new run for the given pipeline. Runs asynchronously.","operationId":"run_pipeline_pipelines__pipeline_id__run_post","parameters":[{"name":"pipeline_id","in":"path","required":true,"schema":{"type":"string","format":"uuid","title":"Pipeline Id"}}],"responses":{"201":{"description":"Successful Response","content":{"application/json":{"schema":{"$ref":"#/components/schemas/Run"}}}},"422":{"description":"Validation Error","content":{"application/json":{"schema":{"$ref":"#/components/schemas/HTTPValidationError"}}}}}}},"/pipelines/{pipeline_id}/runs":{"get":{"summary":"List runs for a pipeline","description":"List all runs associated with a pipeline.","operationId":"list_runs_pipelines__pipeline_id__runs_get","parameters":[{"name":"pipeline_id","in":"path","required":true,"schema":{"type":"string","format":"uuid","title":"Pipeline Id"}}],"responses":{"200":{"description":"Successful Response","content":{"application/json":{"schema":{"type":"array","items":{"$ref":"#/components/schemas/Run"},"title":"Response List Runs Pipelines Pipeline Id Runs Get"}}}},"422":{"description":"Validation Error","content":{"application/json":{"schema":{"$ref":"#/components/schemas/HTTPValidationError"}}}}}}},"/pipelines/{pipeline_id}/runs/{run_id}":{"get":{"summary":"Get run status","description":"Retrieve the status of a specific run.","operationId":"get_run_pipelines__pipeline_id__runs__run_id__get","parameters":[{"name":"pipeline_id","in":"path","required":true,"schema":{"type":"string","format":"uuid","title":"Pipeline Id"}},{"name":"run_id","in":"path","required":true,"schema":{"type":"string","format":"uuid","title":"Run Id"}}],"responses":{"200":{"description":"Successful Response","content":{"application/json":{"schema":{"$ref":"#/components/schemas/Run"}}}},"422":{"description":"Validation Error","content":{"application/json":{"schema":{"$ref":"#/components/schemas/HTTPValidationError"}}}}}}},"/pipelines/{pipeline_id}/runs/{run_id}/results":{"get":{"summary":"Get run results","description":"Retrieve normalized results of a completed run.","operationId":"get_run_results_pipelines__pipeline_id__runs__run_id__results_get","parameters":[{"name":"pipeline_id","in":"path","required":true,"schema":{"type":"string","format":"uuid","title":"Pipeline Id"}},{"name":"run_id","in":"path","required":true,"schema":{"type":"string","format":"uuid","title":"Run Id"}}],"responses":{"200":{"description":"Successful Response","content":{"application/json":{"schema":{"type":"array","items":{"type":"object","additionalProperties":true},"title":"Response Get Run Results Pipelines Pipeline Id Runs Run Id Results Get"}}}},"422":{"description":"Validation Error","content":{"application/json":{"schema":{"$ref":"#/components/schemas/HTTPValidationError"}}}}}}},"/pipelines/{pipeline_id}/runs/{run_id}/error":{"get":{"summary":"Get run error message","description":"Retrieve the error message for a run that failed.","operationId":"get_run_error_pipelines__pipeline_id__runs__run_id__error_get","parameters":[{"name":"pipeline_id","in":"path","required":true,"schema":{"type":"string","format":"uuid","title":"Pipeline Id"}},{"name":"run_id","in":"path","required":true,"schema":{"type":"string","format":"uuid","title":"Run Id"}}],"responses":{"200":{"description":"Successful Response","content":{"application/json":{"schema":{"type":"string","title":"Response Get Run Error Pipelines Pipeline Id Runs Run Id Error Get"}}}},"422":{"description":"Validation Error","content":{"application/json":{"schema":{"$ref":"#/components/schemas/HTTPValidationError"}}}}}}}},"components":{"schemas":{"ApiConfig":{"properties":{"url":{"type":"string","maxLength":2083,"minLength":1,"format":"uri","title":"Url","description":"API endpoint URL","example":"https://api.example.com/data"},"token":{"anyOf":[{"type":"string"},{"type":"null"}],"title":"Token","description":"Optional bearer token for API authentication","example":"abcdef123456"}},"type":"object","required":["url"],"title":"ApiConfig","description":"Configuration for an API source."},"ApiSource":{"properties":{"type":{"type":"string","const":"api","title":"Type","description":"Discriminator for API source","default":"api"},"config":{"$ref":"#/components/schemas/ApiConfig"}},"type":"object","required":["config"],"title":"ApiSource","description":"An API-based data source."},"FileConfig":{"properties":{"path":{"type":"string","title":"Path","description":"Path to the input file","example":"/data/myfile.json"},"format":{"type":"string","enum":["csv","json","sqlite"],"title":"Format","description":"Format of the file","default":"json","example":"csv"}},"type":"object","required":["path"],"title":"FileConfig","description":"Configuration for a file-based source."},"FileSource":{"properties":{"type":{"type":"string","const":"file","title":"Type","description":"Discriminator for file source","default":"file"},"config":{"$ref":"#/components/schemas/FileConfig"}},"type":"object","required":["config"],"title":"FileSource","description":"A file-based data source."},"HTTPValidationError":{"properties":{"detail":{"items":{"$ref":"#/components/schemas/ValidationError"},"type":"array","title":"Detail"}},"type":"object","title":"HTTPValidationError"},"Pipeline":{"properties":{"id":{"type":"string","format":"uuid","title":"Id","description":"Unique identifier for the pipeline"},"name":{"anyOf":[{"type":"string"},{"type":"null"}],"title":"Name","description":"Optional human-readable name for the pipeline"},"sources":{"items":{"oneOf":[{"$ref":"#/components/schemas/ApiSource"},{"$ref":"#/components/schemas/ScrapeSource"},{"$ref":"#/components/schemas/FileSource"}],"description":"Union of all source types","discriminator":{"propertyName":"type","mapping":{"api":"#/components/schemas/ApiSource","file":"#/components/schemas/FileSource","scrape":"#/components/schemas/ScrapeSource"}}},"type":"array","title":"Sources","description":"List of configured data sources"},"created_at":{"type":"string","format":"date-time","title":"Created At","description":"UTC timestamp when the pipeline was created"}},"type":"object","required":["id","sources","created_at"],"title":"Pipeline","description":"Representation of a pipeline."},"PipelineCreate":{"properties":{"name":{"anyOf":[{"type":"string"},{"type":"null"}],"title":"Name","description":"Optional human-readable name for the pipeline","example":"My Data Pipeline"},"sources":{"items":{"oneOf":[{"$ref":"#/components/schemas/ApiSource"},{"$ref":"#/components/schemas/ScrapeSource"},{"$ref":"#/components/schemas/FileSource"}],"description":"Union of all source types","discriminator":{"propertyName":"type","mapping":{"api":"#/components/schemas/ApiSource","file":"#/components/schemas/FileSource","scrape":"#/components/schemas/ScrapeSource"}}},"type":"array","title":"Sources","description":"List of data sources for this pipeline"}},"type":"object","required":["sources"],"title":"PipelineCreate","description":"Payload for creating a new pipeline."},"Run":{"properties":{"id":{"type":"string","format":"uuid","title":"Id"},"pipeline_id":{"type":"string","format":"uuid","title":"Pipeline Id"},"status":{"type":"string","enum":["PENDING","RUNNING","COMPLETED","FAILED"],"title":"Status"},"started_at":{"type":"string","format":"date-time","title":"Started At"},"finished_at":{"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}],"title":"Finished At"}},"type":"object","required":["id","pipeline_id","status","started_at"],"title":"Run","description":"Status of a pipeline run."},"ScrapeConfig":{"properties":{"urls":{"items":{"type":"string","maxLength":2083,"minLength":1,"format":"uri"},"type":"array","title":"Urls","description":"List of URLs to scrape","example":["https://example.com/page1","https://example.com/page2"]},"schema_file":{"anyOf":[{"type":"string"},{"type":"null"}],"title":"Schema File","description":"Path to a JSON file containing CSS extraction schema","example":"schemas/page_schema.json"},"prompt":{"anyOf":[{"type":"string"},{"type":"null"}],"title":"Prompt","description":"Prompt string for LLM-based extraction","example":"Extract product titles and prices"}},"type":"object","required":["urls"],"title":"ScrapeConfig","description":"Configuration for a web-scraping source."},"ScrapeSource":{"properties":{"type":{"type":"string","const":"scrape","title":"Type","description":"Discriminator for scrape source","default":"scrape"},"config":{"$ref":"#/components/schemas/ScrapeConfig"}},"type":"object","required":["config"],"title":"ScrapeSource","description":"A web-scraping data source."},"ValidationError":{"properties":{"loc":{"items":{"anyOf":[{"type":"string"},{"type":"integer"}]},"type":"array","title":"Location"},"msg":{"type":"string","title":"Message"},"type":{"type":"string","title":"Error Type"}},"type":"object","required":["loc","msg","type"],"title":"ValidationError"}}}} \ No newline at end of file diff --git a/services.py b/services.py index efc0038..fd7819e 100644 --- a/services.py +++ b/services.py @@ -10,6 +10,7 @@ import stores import models from ingestion.ingestor import Ingestor from normalization.normalizer import Normalizer +from log.logging_utils import setup_run_logging, cleanup_run_logging, pipeline_log def execute_pipeline(pipeline: models.Pipeline, run_id: UUID) -> None: @@ -24,13 +25,19 @@ def execute_pipeline(pipeline: models.Pipeline, run_id: UUID) -> None: if not run: return + # Setup structured per-run logging + setup_run_logging(str(pipeline.id), str(run_id)) + pipeline_log("INFO", "Pipeline run starting", str(pipeline.id), str(run_id), status="RUNNING") + # Mark as running run.status = 'RUNNING' run.started_at = datetime.utcnow() try: # Ingest raw records + pipeline_log("INFO", "Ingesting raw records", str(pipeline.id), str(run_id)) raw_records: List[Dict[str, Any]] = Ingestor.run(pipeline.sources) + pipeline_log("INFO", f"Ingested {len(raw_records)} records", str(pipeline.id), str(run_id)) # Normalize records normalizer = Normalizer() @@ -39,6 +46,7 @@ def execute_pipeline(pipeline: models.Pipeline, run_id: UUID) -> None: source_type = raw.get('source_type') source = raw.get('source') if not source_type or not source: + pipeline_log("ERROR", "Record missing 'source_type' or 'source'", str(pipeline.id), str(run_id), status="FAILED") raise ValueError("Record missing 'source_type' or 'source'.") norm = normalizer.normalize([raw], source_type, source) canonical.extend(norm) @@ -47,9 +55,14 @@ def execute_pipeline(pipeline: models.Pipeline, run_id: UUID) -> None: run.status = 'COMPLETED' run.finished_at = datetime.utcnow() run.results = canonical + pipeline_log("SUCCESS", f"Pipeline run completed with {len(canonical)} records", str(pipeline.id), str(run_id), status="COMPLETED") except Exception as e: - # Failure + # Log failure with stack trace + pipeline_log("ERROR", f"Pipeline run failed: {e}", str(pipeline.id), str(run_id), status="FAILED", error=str(e)) run.status = 'FAILED' run.finished_at = datetime.utcnow() - run.error = str(e) \ No newline at end of file + run.error = str(e) + finally: + pipeline_log("INFO", "Pipeline run finished", str(pipeline.id), str(run_id), status=run.status) + cleanup_run_logging(str(run_id)) \ No newline at end of file diff --git a/uv.lock b/uv.lock index d906887..844a213 100644 --- a/uv.lock +++ b/uv.lock @@ -305,6 +305,7 @@ dependencies = [ { name = "crawl4ai" }, { name = "fastapi", extra = ["standard"] }, { name = "inquirer" }, + { name = "loguru" }, { name = "pandas" }, { name = "python-dotenv" }, { name = "rich" }, @@ -315,6 +316,7 @@ requires-dist = [ { name = "crawl4ai", specifier = ">=0.5.0.post8" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.115.12" }, { name = "inquirer", specifier = ">=3.4.0" }, + { name = "loguru", specifier = ">=0.7.3" }, { name = "pandas", specifier = ">=2.2.3" }, { name = "python-dotenv", specifier = ">=1.1.0" }, { name = "rich", specifier = ">=14.0.0" }, @@ -813,6 +815,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cf/f4/25a25c75ec02fcec729cee95288635367f2cdf8add0416494d0c42842ccc/litellm-1.65.6-py3-none-any.whl", hash = "sha256:c65ec7676f251c4f28cfb7446a542d15f091fe0fb71d6d6e630d8c8849f9a76d", size = 7518562 }, ] +[[package]] +name = "loguru" +version = "0.7.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, + { name = "win32-setctime", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3a/05/a1dae3dffd1116099471c643b8924f5aa6524411dc6c63fdae648c4f1aca/loguru-0.7.3.tar.gz", hash = "sha256:19480589e77d47b8d85b2c827ad95d49bf31b0dcde16593892eb51dd18706eb6", size = 63559 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0c/29/0348de65b8cc732daa3e33e67806420b2ae89bdce2b04af740289c5c6c8c/loguru-0.7.3-py3-none-any.whl", hash = "sha256:31a33c10c8e1e10422bfd431aeb5d351c7cf7fa671e3c4df004162264b28220c", size = 61595 }, +] + [[package]] name = "lxml" version = "5.3.2" @@ -1859,6 +1874,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fa/a8/5b41e0da817d64113292ab1f8247140aac61cbf6cfd085d6a0fa77f4984f/websockets-15.0.1-py3-none-any.whl", hash = "sha256:f7a866fbc1e97b5c617ee4116daaa09b722101d4a3c170c787450ba409f9736f", size = 169743 }, ] +[[package]] +name = "win32-setctime" +version = "1.2.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b3/8f/705086c9d734d3b663af0e9bb3d4de6578d08f46b1b101c2442fd9aecaa2/win32_setctime-1.2.0.tar.gz", hash = "sha256:ae1fdf948f5640aae05c511ade119313fb6a30d7eabe25fef9764dca5873c4c0", size = 4867 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e1/07/c6fe3ad3e685340704d314d765b7912993bcb8dc198f0e7a89382d37974b/win32_setctime-1.2.0-py3-none-any.whl", hash = "sha256:95d644c4e708aba81dc3704a116d8cbc974d70b3bdb8be1d150e36be6e9d1390", size = 4083 }, +] + [[package]] name = "xmod" version = "1.8.1"