diff --git a/pipeline/config.py b/pipeline/config.py index 92ee3f5..c46344b 100644 --- a/pipeline/config.py +++ b/pipeline/config.py @@ -1,85 +1,112 @@ -# config.py -import os -from dotenv import load_dotenv -from pathlib import Path - -# Load environment variables from .env file located in the script's directory -# Make sure .env is in the *same directory* as config.py -dotenv_path = Path(__file__).parent / '.env' -if dotenv_path.is_file(): - load_dotenv(dotenv_path=dotenv_path) -else: - print(f"Warning: .env file not found at {dotenv_path}") +import asyncio +import sys +from enum import Enum +from pydantic_settings import BaseSettings, SettingsConfigDict +from loguru import logger -# --- Default Settings --- -DEFAULT_OUTPUT_FILE = "extracted_data.json" -DEFAULT_OUTPUT_FORMAT = "json" # csv, json, sqlite -DEFAULT_CACHE_MODE = "ENABLED" # ENABLED, BYPASS, DISABLED, READ_ONLY, WRITE_ONLY -DEFAULT_VERBOSE = False -DEFAULT_LLM_PROVIDER = "openai/gpt-4o-mini" # Default LLM +class StoreType(str, Enum): + """Supported pipeline data store types.""" -# --- LLM Provider Configuration --- -PROVIDER_ENV_MAP = { - "openai": "OPENAI_API_KEY", - "gemini": "GEMINI_API_KEY", - "groq": "GROQ_API_KEY", - "anthropic": "ANTHROPIC_API_KEY", - "ollama": None, # Ollama typically doesn't require an API key - # Add other providers and their corresponding env variable names here -} + MEMORY = "MEMORY" -def get_api_key_env_name(provider: str) -> str | None: - """Gets the expected environment variable name for the given provider.""" - provider_prefix = provider.split('/')[0].lower() - return PROVIDER_ENV_MAP.get(provider_prefix) - - -def get_api_key(provider: str, direct_key: str | None = None, env_var_name: str | None = None) -> str | None: +class AppSettings(BaseSettings): """ - Retrieves the API key for a given provider. - Priority: direct_key > env_var_name > default env var from PROVIDER_ENV_MAP. + Central configuration settings for the application. + Loads values from environment variables or a .env file. """ - if direct_key: - print(f"INFO: Using direct API key provided via --api-key for provider '{provider}'.") - return direct_key - if env_var_name: - key = os.getenv(env_var_name) - if key: - print( - f"INFO: Using API key from specified environment variable '{env_var_name}' for provider '{provider}'." - ) - return key - else: - print(f"Warning: Specified environment variable '{env_var_name}' not found.") + # Application settings + APP_NAME: str = "PipelineRunnerApp" + LOG_LEVEL: str = "INFO" # Logging level (e.g., DEBUG, INFO, WARNING) + LOG_ENABLE_SSE: bool = True # Flag to enable/disable SSE log streaming sink - default_env_name = get_api_key_env_name(provider) - if default_env_name: - key = os.getenv(default_env_name) - if key: - print( - f"INFO: Using API key from default environment variable '{default_env_name}' for provider '{provider}'." - ) - return key - else: - if default_env_name is not None: # Don't warn if provider like Ollama has None mapping - print( - f"Warning: Default environment variable '{default_env_name}' for provider '{provider}' not found." - ) - return None + # Store configuration + STORE_TYPE: StoreType = StoreType.MEMORY - # If provider is not in map and no key was provided - # Allow providers like 'ollama' to proceed without a key - if provider.split('/')[0].lower() != "ollama": - print(f"Warning: No API key found or specified for provider '{provider}'. LLM features might fail.") - return None + # Scheduler configuration + SCHEDULER_CHECK_INTERVAL: int = 60 # Seconds between pipeline discovery checks + SCHEDULER_MAX_CONCURRENT_RUNS: int = 5 # Max concurrent pipeline runs via scheduler + SCHEDULER_MISFIRE_GRACE_SEC: int = 300 # Grace time for missed jobs (seconds) + + # Ingestion Defaults + DEFAULT_API_TIMEOUT: int = 30 + DEFAULT_SCRAPER_LLM_PROVIDER: str = "openai/gpt-4o-mini" + DEFAULT_SCRAPER_CACHE_MODE: str = "ENABLED" + DEFAULT_SCRAPER_PROMPT: str = ( + "Extract all data from the page in as much detailed as possible" + ) + + # SSE Configuration + SSE_LOG_QUEUE_MAX_SIZE: int = 1000 # Max size for the SSE log queue + + # Pydantic settings configuration + model_config = SettingsConfigDict( + env_file=".env", # Load .env file if it exists + case_sensitive=False, # Environment variables are case-insensitive + extra="ignore", # Ignore extra fields from environment + ) -# --- Exportable Configuration Variables --- -LLM_PROVIDER = os.getenv("DEFAULT_LLM_PROVIDER", DEFAULT_LLM_PROVIDER) -OUTPUT_FILE = os.getenv("DEFAULT_OUTPUT_FILE", DEFAULT_OUTPUT_FILE) -OUTPUT_FORMAT = os.getenv("DEFAULT_OUTPUT_FORMAT", DEFAULT_OUTPUT_FORMAT) -CACHE_MODE = os.getenv("DEFAULT_CACHE_MODE", DEFAULT_CACHE_MODE) -VERBOSE = os.getenv("DEFAULT_VERBOSE", str(DEFAULT_VERBOSE)).lower() in ('true', '1', 't') +settings = AppSettings() + +# --- Basic Loguru Configuration --- +logger.remove() +logger.add( + sys.stderr, + level=settings.LOG_LEVEL.upper(), + format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", +) + +# File Sink +logger.add( + "logs/app_{time}.log", + level=settings.LOG_LEVEL.upper(), + rotation="10 MB", # Rotate log file when it reaches 10 MB + retention="7 days", # Keep logs for 7 days + compression="zip", # Compress rotated files + format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", +) + +logger.info("Logger configured with level: {}", settings.LOG_LEVEL) +logger.info( + "Application settings loaded. Store type: {}, SSE Logging: {}", + settings.STORE_TYPE, + "Enabled" if settings.LOG_ENABLE_SSE else "Disabled", +) + +# --------- SSE Log Queue --------- + +sse_log_queue = None + + +def set_sse_log_queue(queue): + """Sets the global SSE log queue instance.""" + global sse_log_queue + sse_log_queue = queue + if settings.LOG_ENABLE_SSE and queue: + logger.info("SSE Log Queue set and SSE sink enabled.") + logger.add( + sse_log_sink, + level=settings.LOG_LEVEL.upper(), + format="{message}", + enqueue=True, + ) + elif settings.LOG_ENABLE_SSE and not queue: + logger.warning("SSE Log Queue is None, cannot enable SSE sink.") + else: + logger.info("SSE Logging is disabled by configuration.") + + +def sse_log_sink(message): + """Loguru sink function to put messages onto the SSE queue.""" + if sse_log_queue: + try: + record = message.record + log_entry = f"{record['time']:YYYY-MM-DD HH:mm:ss.SSS} | {record['level']: <8} | {record['name']}:{record['function']}:{record['line']} - {record['message']}" + sse_log_queue.put_nowait(log_entry) + except asyncio.QueueFull: + print("Warning: SSE log queue is full. Dropping message.", file=sys.stderr) + except Exception as e: + print(f"Error in SSE log sink: {e}", file=sys.stderr) diff --git a/pipeline/dependencies.py b/pipeline/dependencies.py index 647b128..455fbcb 100644 --- a/pipeline/dependencies.py +++ b/pipeline/dependencies.py @@ -1,4 +1,8 @@ -from fastapi import Request +import asyncio +from fastapi import Request, status +from fastapi.exceptions import HTTPException + +from loguru import logger async def get_pipeline_service(request: Request): @@ -6,3 +10,15 @@ async def get_pipeline_service(request: Request): if not service: raise Exception("PipelineService not initialized or available in app state.") return service + + +async def get_sse_log_queue(request: Request) -> asyncio.Queue | None: + """Dependency to get the SSE log queue from app state.""" + queue = getattr(request.app.state, "sse_log_queue", None) + if not queue: + logger.error("SSE log queue not found in application state.") + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Log streaming service queue not available.", + ) + return queue diff --git a/pipeline/ingestion/adapters/api_adapter.py b/pipeline/ingestion/adapters/api_adapter.py index 05318d2..5f35156 100644 --- a/pipeline/ingestion/adapters/api_adapter.py +++ b/pipeline/ingestion/adapters/api_adapter.py @@ -6,6 +6,8 @@ import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry +from config import settings + from models.ingestion import AdapterRecord from .base import DataSourceAdapter @@ -21,7 +23,7 @@ class ApiAdapter(DataSourceAdapter): self, url: str, headers: dict[str, str] | None = None, - timeout: float = 30, + timeout: float = settings.DEFAULT_API_TIMEOUT, token: str | None = None, ): """ @@ -38,7 +40,9 @@ class ApiAdapter(DataSourceAdapter): if token: self.headers["Authorization"] = f"Bearer {token}" self.timeout = timeout - logger.info(f"Initializing ApiAdapter for URL: {url}") + logger.info( + f"Initializing ApiAdapter for URL: {url} with timeout: {self.timeout}s" + ) self.session = self._init_session() def _init_session(self) -> requests.Session: diff --git a/pipeline/ingestion/adapters/web_scraper_adapter.py b/pipeline/ingestion/adapters/web_scraper_adapter.py index e2e8dbf..1e9b8b4 100644 --- a/pipeline/ingestion/adapters/web_scraper_adapter.py +++ b/pipeline/ingestion/adapters/web_scraper_adapter.py @@ -5,6 +5,8 @@ Web scraper adapter using crawl4ai to extract structured data. import asyncio import json +from config import settings + from crawl4ai import ( AsyncWebCrawler, BrowserConfig, @@ -33,18 +35,16 @@ class WebScraperAdapter(DataSourceAdapter): Adapter for web scraping using crawl4ai. """ - DEFAULT_PROMPT = "Extract all data from the page in as much detailed as possible" - def __init__( self, urls: list[str], api_key: str, schema_file: str | None = None, - prompt: str = DEFAULT_PROMPT, - llm_provider: str = "openai/gpt-4o-mini", + prompt: str = settings.DEFAULT_SCRAPER_PROMPT, + llm_provider: str = settings.DEFAULT_SCRAPER_LLM_PROVIDER, output_format: str = "json", verbose: bool = False, - cache_mode: str = "ENABLED", + cache_mode: str = settings.DEFAULT_SCRAPER_CACHE_MODE, ): """ Initialize the scraper adapter. @@ -71,6 +71,12 @@ class WebScraperAdapter(DataSourceAdapter): f"Initialized WebScraperAdapter for URLs: {urls} with schema_file={schema_file}, prompt={prompt}, llm_provider={llm_provider}, output_format={output_format}, verbose={verbose}, cache_mode={cache_mode}" ) + if not self.api_key: + logger.error( + "API Key is required for WebScraperAdapter but was not provided." + ) + raise ValueError("API Key is required for WebScraperAdapter.") + def fetch(self) -> list[AdapterRecord]: """ Synchronously fetch data by running the async crawler. @@ -119,7 +125,7 @@ class WebScraperAdapter(DataSourceAdapter): elif self.prompt: extraction_strategy = LLMExtractionStrategy( llm_config=llm_cfg, - instruction=self.prompt, + instruction=self.prompt, # Use the instance's prompt extraction_type="schema", apply_chunking=True, verbose=self.verbose, diff --git a/pipeline/ingestion/ingestors/simple_ingest.py b/pipeline/ingestion/ingestors/simple_ingest.py index 9e361fb..d8f7bba 100644 --- a/pipeline/ingestion/ingestors/simple_ingest.py +++ b/pipeline/ingestion/ingestors/simple_ingest.py @@ -1,3 +1,5 @@ +from config import settings + from ingestion.adapters.api_adapter import ApiAdapter from ingestion.adapters.file_adapter import FileAdapter from ingestion.adapters.web_scraper_adapter import WebScraperAdapter @@ -27,7 +29,7 @@ class SimpleIngestionStrategy(IngestionMethod): adapter = ApiAdapter( url=config.url, headers=config.headers, - timeout=config.timeout or 30, + timeout=config.timeout or settings.DEFAULT_API_TIMEOUT, token=config.token, ) records = adapter.fetch() @@ -45,18 +47,24 @@ class SimpleIngestionStrategy(IngestionMethod): urls=config.urls, api_key=config.api_key, schema_file=config.schema_file, - prompt=config.prompt or WebScraperAdapter.DEFAULT_PROMPT, - llm_provider=config.llm_provider or "openai/gpt-4o-mini", + prompt=config.prompt or settings.DEFAULT_SCRAPER_PROMPT, + llm_provider=config.llm_provider + or settings.DEFAULT_SCRAPER_LLM_PROVIDER, output_format=config.output_format or "json", verbose=config.verbose or False, - cache_mode=config.cache_mode or "ENABLED", + cache_mode=config.cache_mode + or settings.DEFAULT_SCRAPER_CACHE_MODE, ) records = adapter.fetch() results.extend(records) + except ValueError as ve: + logger.error(f"Configuration error for source {source.type}: {ve}") except Exception as e: - logger.error(f"Failed to ingest from source {source.type}: {e}") + logger.error( + f"Failed to ingest from source {source.type}: {e}", exc_info=True + ) return OutputData( records=results, diff --git a/pipeline/main.py b/pipeline/main.py index 6fd5668..1f545d8 100644 --- a/pipeline/main.py +++ b/pipeline/main.py @@ -1,14 +1,20 @@ -from fastapi import FastAPI -from contextlib import asynccontextmanager import platform import asyncio + +from fastapi import FastAPI +from contextlib import asynccontextmanager from loguru import logger +from config import settings, set_sse_log_queue + from stores.memory import InMemoryPipelineStore from stores.base import PipelineStore from services.pipeline_service import PipelineService from scheduler.manager import SchedulerManager from routers.pipelines import router as pipelines_router +from routers.logs import router as logs_router + +sse_queue = asyncio.Queue(maxsize=settings.SSE_LOG_QUEUE_MAX_SIZE) # ! Window specific asyncio policy if platform.system() == "Windows": @@ -18,8 +24,12 @@ if platform.system() == "Windows": # --- Resource Initialization --- pipeline_store: PipelineStore = InMemoryPipelineStore() pipeline_service = PipelineService(store=pipeline_store) -scheduler_manager = SchedulerManager(pipeline_service=pipeline_service) - +scheduler_manager = SchedulerManager( + pipeline_service=pipeline_service, + check_interval_seconds=settings.SCHEDULER_CHECK_INTERVAL, + max_concurrent_runs=settings.SCHEDULER_MAX_CONCURRENT_RUNS, + misfire_grace_sec=settings.SCHEDULER_MISFIRE_GRACE_SEC, +) # to avoid circular import pipeline_service.set_scheduler_manager(scheduler_manager) @@ -32,10 +42,13 @@ async def lifespan(app: FastAPI): app.state.pipeline_store = pipeline_store app.state.scheduler_manager = scheduler_manager app.state.pipeline_service = pipeline_service + app.state.sse_log_queue = sse_queue + + # Configure Loguru SSE Sink (needs the queue instance) + set_sse_log_queue(sse_queue) # Initialize and start the scheduler logger.info("Initializing and starting SchedulerManager...") - scheduler_manager.start() logger.info("SchedulerManager started.") @@ -51,7 +64,7 @@ async def lifespan(app: FastAPI): # --- FastAPI App --- app = FastAPI( - title="Data Integration Pipeline API", + title=settings.APP_NAME, description="API for managing and running data integration pipelines.", version="0.1.0", lifespan=lifespan, @@ -59,6 +72,7 @@ app = FastAPI( # Include the pipelines router app.include_router(pipelines_router) +app.include_router(logs_router) # --- Root Endpoint (Optional) --- diff --git a/pipeline/pyproject.toml b/pipeline/pyproject.toml index 2da7861..68d77fa 100644 --- a/pipeline/pyproject.toml +++ b/pipeline/pyproject.toml @@ -19,10 +19,11 @@ dependencies = [ "python-dotenv>=1.1.0", "responses>=0.25.7", "rich>=14.0.0", + "sse-starlette>=2.3.4", ] [tool.pytest.ini_options] asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" log_cli = true -log_cli_level = "INFO" \ No newline at end of file +log_cli_level = "INFO" diff --git a/pipeline/routers/logs.py b/pipeline/routers/logs.py new file mode 100644 index 0000000..008d0c7 --- /dev/null +++ b/pipeline/routers/logs.py @@ -0,0 +1,53 @@ +# routers/logs.py +import asyncio +from fastapi import APIRouter, Request, Depends +from sse_starlette.sse import EventSourceResponse +from loguru import logger + +from dependencies import get_sse_log_queue + +router = APIRouter( + prefix="/logs", + tags=["Logging"], +) + + +async def log_stream_generator(request: Request, queue: asyncio.Queue): + """Generates SSE messages from the log queue.""" + logger.info("SSE client connected for log streaming.") + while True: + try: + if await request.is_disconnected(): + logger.info("SSE client disconnected.") + break + + log_message = await queue.get() + yield {"event": "log", "data": log_message} + + queue.task_done() + + except asyncio.CancelledError: + logger.info("Log stream task cancelled (client likely disconnected).") + break + except Exception as e: + logger.error(f"Error in SSE log generator: {e}", exc_info=True) + try: + yield {"event": "error", "data": f"Log streaming error: {e}"} + except Exception: # Ignore if yield fails (client might be gone) + pass + await asyncio.sleep(1) # Avoid tight loop on persistent error + + +@router.get("/stream", summary="Stream application logs via SSE") +async def stream_logs( + request: Request, queue: asyncio.Queue = Depends(get_sse_log_queue) +): + """ + Establishes a Server-Sent Events (SSE) connection to stream application logs. + + Events: + - **event: log**: Contains a single log entry as string data. + - **event: error**: Sent if an error occurs during streaming. + """ + event_generator = log_stream_generator(request, queue) + return EventSourceResponse(event_generator) diff --git a/pipeline/scheduler/manager.py b/pipeline/scheduler/manager.py index 41fc929..662e8b5 100644 --- a/pipeline/scheduler/manager.py +++ b/pipeline/scheduler/manager.py @@ -13,6 +13,8 @@ from apscheduler.executors.asyncio import AsyncIOExecutor from apscheduler.job import Job from apscheduler.jobstores.base import JobLookupError +from config import settings + from loguru import logger from models.pipeline import Pipeline, PipelineStatus @@ -27,18 +29,19 @@ class SchedulerManager: def __init__( self, pipeline_service: PipelineService, - check_interval_seconds: int = 60, - max_concurrent_runs: int = 5, - misfire_grace_sec: int = 300, + check_interval_seconds: int = settings.SCHEDULER_CHECK_INTERVAL, + max_concurrent_runs: int = settings.SCHEDULER_MAX_CONCURRENT_RUNS, + misfire_grace_sec: int = settings.SCHEDULER_MISFIRE_GRACE_SEC, ): self.pipeline_service = pipeline_service self.check_interval_seconds = check_interval_seconds + self.max_concurrent_runs = max_concurrent_runs self._scheduler: AsyncIOScheduler | None = None self._running = False self._discovery_job_id = "pipeline_discovery_job" self.misfire_grace_sec = misfire_grace_sec - # Configure APScheduler (same as before) + # Configure APScheduler jobstores = {"default": MemoryJobStore()} executors = {"default": AsyncIOExecutor()} job_defaults = { @@ -52,10 +55,9 @@ class SchedulerManager: job_defaults=job_defaults, timezone=UTC, ) - logger.info("APScheduler configured.") - # Link the scheduler back to the service *after* both are initialized - # This is often done in the main application setup - # self.pipeline_service.set_scheduler_manager(self) + logger.info( + f"APScheduler configured with misfire_grace_time: {self.misfire_grace_sec}s" + ) async def schedule_pipeline(self, pipeline: Pipeline): """Adds or updates a job for a specific pipeline based on its next_run time.""" @@ -218,7 +220,6 @@ class SchedulerManager: f"Error during pipeline discovery/reconciliation: {e}", exc_info=True ) - # start() and stop() methods remain largely the same as before def start(self): """Starts the scheduler and the discovery job.""" if not self._running and self._scheduler: diff --git a/pipeline/uv.lock b/pipeline/uv.lock index 02dbdd4..c833b17 100644 --- a/pipeline/uv.lock +++ b/pipeline/uv.lock @@ -328,6 +328,7 @@ dependencies = [ { name = "python-dotenv" }, { name = "responses" }, { name = "rich" }, + { name = "sse-starlette" }, ] [package.metadata] @@ -346,6 +347,7 @@ requires-dist = [ { name = "python-dotenv", specifier = ">=1.1.0" }, { name = "responses", specifier = ">=0.25.7" }, { name = "rich", specifier = ">=14.0.0" }, + { name = "sse-starlette", specifier = ">=2.3.4" }, ] [[package]] @@ -1737,6 +1739,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/c2/fe97d779f3ef3b15f05c94a2f1e3d21732574ed441687474db9d342a7315/soupsieve-2.6-py3-none-any.whl", hash = "sha256:e72c4ff06e4fb6e4b5a9f0f55fe6e81514581fca1515028625d0f299c602ccc9", size = 36186 }, ] +[[package]] +name = "sse-starlette" +version = "2.3.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "starlette" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/43/be/7e776a29b5f712b5bd13c571256a2470fcf345c562c7b2359f2ee15d9355/sse_starlette-2.3.4.tar.gz", hash = "sha256:0ffd6bed217cdbb74a84816437c609278003998b4991cd2e6872d0b35130e4d5", size = 17522 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/43/a4/ee4a20f0b5ff34c391f3685eff7cdba1178a487766e31b04efb51bbddd87/sse_starlette-2.3.4-py3-none-any.whl", hash = "sha256:b8100694f3f892b133d0f7483acb7aacfcf6ed60f863b31947664b6dc74e529f", size = 10232 }, +] + [[package]] name = "starlette" version = "0.46.2"