diff --git a/pipeline/config.py b/pipeline/config.py index c46344b..2dd88b9 100644 --- a/pipeline/config.py +++ b/pipeline/config.py @@ -19,7 +19,7 @@ class AppSettings(BaseSettings): # Application settings APP_NAME: str = "PipelineRunnerApp" - LOG_LEVEL: str = "INFO" # Logging level (e.g., DEBUG, INFO, WARNING) + LOG_LEVEL: str = "DEBUG" # Logging level (e.g., DEBUG, INFO, WARNING) LOG_ENABLE_SSE: bool = True # Flag to enable/disable SSE log streaming sink # Store configuration @@ -56,7 +56,8 @@ logger.remove() logger.add( sys.stderr, level=settings.LOG_LEVEL.upper(), - format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", + # format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", + colorize=True, ) # File Sink diff --git a/pipeline/ingestion/core.py b/pipeline/ingestion/core.py index e7cc6fd..cab0d43 100644 --- a/pipeline/ingestion/core.py +++ b/pipeline/ingestion/core.py @@ -1,4 +1,8 @@ -from ingestors import IngestionMethod, SimpleIngestionStrategy, MLIngestionStrategy +from ingestion.ingestors import ( + IngestionMethod, + SimpleIngestionStrategy, + MLIngestionStrategy, +) from models.ingestion import IngestSourceConfig, OutputData diff --git a/pipeline/models/models.py b/pipeline/models/models.py deleted file mode 100644 index 084d953..0000000 --- a/pipeline/models/models.py +++ /dev/null @@ -1,186 +0,0 @@ -""" -Pydantic models for pipelines and runs. -""" - -from typing import List, Union, Annotated, Optional, Literal, Dict, Any -from uuid import UUID -from datetime import datetime - -from pydantic import BaseModel, Field, HttpUrl, field_validator, ValidationInfo - - -class RunCreate(BaseModel): - """ - Model for creating a new run. (Empty) - """ - - pass - - -class Run(BaseModel): - """ - Status of a pipeline run. - """ - - id: UUID - pipeline_id: UUID - status: Literal["PENDING", "RUNNING", "COMPLETED", "FAILED"] - started_at: datetime - finished_at: Optional[datetime] = None - - -class RunResult(Run): - """ - Extended run model including results or error. - """ - - results: Optional[List[Dict[str, Any]]] = None - error: Optional[str] = None - - -class ApiConfig(BaseModel): - """ - Configuration for an API source. - """ - - url: HttpUrl = Field(..., description="API endpoint URL") - token: Optional[str] = Field( - None, - description="Optional bearer token for API authentication", - ) - - -class ScrapeConfig(BaseModel): - """ - Configuration for a web-scraping source. - """ - - urls: List[HttpUrl] = Field( - ..., - description="List of URLs to scrape", - ) - schema_file: Optional[str] = Field( - None, - description="Path to a JSON file containing CSS extraction schema", - ) - prompt: Optional[str] = Field( - None, - description="Prompt string for LLM-based extraction", - ) - - -class FileConfig(BaseModel): - """ - Configuration for a file-based source. Supports either a file path or an uploaded file. - """ - - path: Optional[str] = Field( - None, - description="Path to the input file (optional if upload is provided)", - ) - upload: Optional[Any] = Field( - None, - description="Uploaded file object or metadata (optional if path is provided)", - ) - upload_filename: Optional[str] = Field( - None, - description="Original filename of the uploaded file (for validation)", - ) - format: Literal["csv", "json", "sqlite"] = Field( - "json", description="Format of the file" - ) - - @field_validator("path", mode="before") - def require_path_or_upload(cls, v, info: ValidationInfo): - data = info.data - if not v and not data.get("upload"): - raise ValueError("Either 'path' or 'upload' must be provided.") - return v - - @field_validator("upload_filename", mode="before") - def filename_extension_matches_format(cls, v, info: ValidationInfo): - fmt = info.data.get("format") - if v and fmt and not v.lower().endswith(f".{fmt}"): - raise ValueError(f"Uploaded file extension must match format '{fmt}'") - return v - - @field_validator("path", mode="after") - def path_or_upload_extension_matches_format(cls, v, info: ValidationInfo): - fmt = info.data.get("format") - upload_filename = info.data.get("upload_filename") - if v and fmt and not v.lower().endswith(f".{fmt}"): - raise ValueError(f"File extension must match format '{fmt}'") - if upload_filename and fmt and not upload_filename.lower().endswith(f".{fmt}"): - raise ValueError(f"Uploaded file extension must match format '{fmt}'") - return v - - -class ApiSource(BaseModel): - """ - An API-based data source. - """ - - type: Literal["api"] = Field( - "api", - description="Discriminator for API source", # Removed const=True - ) - config: ApiConfig - - -class ScrapeSource(BaseModel): - """ - A web-scraping data source. - """ - - type: Literal["scrape"] = Field( - "scrape", - description="Discriminator for scrape source", # Removed const=True - ) - config: ScrapeConfig - - -class FileSource(BaseModel): - """ - A file-based data source. - """ - - type: Literal["file"] = Field( - "file", - description="Discriminator for file source", # Removed const=True - ) - config: FileConfig - - -Source = Annotated[ - Union[ApiSource, ScrapeSource, FileSource], - Field(discriminator="type", description="Union of all source types"), -] - - -class PipelineCreate(BaseModel): - """ - Payload for creating a new pipeline. - """ - - name: Optional[str] = Field( - default=None, - description="Optional human-readable name for the pipeline", - ) - sources: List[Source] = Field( - ..., description="List of data sources for this pipeline" - ) - - -class Pipeline(BaseModel): - """ - Representation of a pipeline. - """ - - id: UUID = Field(..., description="Unique identifier for the pipeline") - name: Optional[str] = Field( - None, description="Optional human-readable name for the pipeline" - ) - sources: List[Source] = Field(..., description="List of configured data sources") - created_at: datetime = Field( - ..., description="UTC timestamp when the pipeline was created" - ) diff --git a/pipeline/services/pipeline_service.py b/pipeline/services/pipeline_service.py index 1e07eae..92040dc 100644 --- a/pipeline/services/pipeline_service.py +++ b/pipeline/services/pipeline_service.py @@ -8,6 +8,8 @@ from uuid import UUID, uuid4 from typing import Optional, List, TYPE_CHECKING from loguru import logger +from ingestion import Ingestor + from models.pipeline import ( Pipeline, PipelineCreate, @@ -240,142 +242,122 @@ class PipelineService: async def run_pipeline(self, pipeline_id: UUID) -> None: """ Executes the pipeline logic, updating status and run times. - This is called by the scheduler job or manual trigger. + Logs associated with this run will include the pipeline_id. """ - logger.info(f"Attempting run execution for pipeline: id={pipeline_id}") - pipeline = await self.store.get(pipeline_id) + # Use contextualize to tag logs originating from this specific run + with logger.contextualize( + pipeline_id=str(pipeline_id) + ): # Ensure it's a string for context + logger.info( + "Attempting run execution for pipeline" + ) # Log context takes effect here + pipeline = await self.store.get(pipeline_id) - if not pipeline: - logger.error(f"Cannot run pipeline: Pipeline not found (id={pipeline_id})") - return - # NOTE: lock mechanism - if pipeline.status == PipelineStatus.ACTIVE: - logger.warning( - f"Pipeline id={pipeline_id} is already ACTIVE. Skipping run." - ) - return - - # --- Mark as ACTIVE --- - try: - pipeline.status = PipelineStatus.ACTIVE - pipeline.updated_at = datetime.now(UTC) # Update timestamp - await self.store.save(pipeline) - logger.info(f"Pipeline {pipeline_id} marked as ACTIVE.") - except Exception as e: - logger.error( - f"Failed to mark pipeline {pipeline_id} as ACTIVE: {e}. Aborting run.", - exc_info=True, - ) - # Attempt to restore status? Depends on store guarantees. - # pipeline.status = original_status # Potentially try rollback - return # Abort run - - # --- Execute Pipeline Logic --- - run_successful = False - try: - logger.info(f"Executing core logic for pipeline id={pipeline_id}...") - # --------------------------------------------------- - # Ensure _execute_ingestion is awaited if it's async - await self._execute_ingestion(pipeline.config.ingestor_config) - # --------------------------------------------------- - logger.info(f"Core logic finished successfully for id={pipeline_id}.") - run_successful = True - - except Exception as e: - logger.error( - f"Core logic failed during pipeline run id={pipeline_id}: {e}", - exc_info=True, - ) - # run_successful remains False - - # --- Update Final State --- - try: - # Fetch the latest state again to minimize race conditions, though the ACTIVE lock helps - final_pipeline_state = await self.store.get(pipeline_id) - if not final_pipeline_state: - logger.warning( - f"Pipeline {pipeline_id} disappeared during run. Cannot update final state." - ) - # The pipeline might have been deleted externally while running. - # Scheduler might need cleanup if the job still exists. - if self.scheduler_manager: - logger.warning( - f"Attempting to unschedule potentially orphaned job for {pipeline_id}" - ) - asyncio.create_task( - self.scheduler_manager.unschedule_pipeline(pipeline_id) - ) + if not pipeline: + logger.error("Cannot run pipeline: Pipeline not found") + return + if pipeline.status == PipelineStatus.ACTIVE: + logger.warning("Pipeline is already ACTIVE. Skipping run.") return - # Avoid modifying the object fetched directly if store uses caching/references - final_pipeline_state = final_pipeline_state.model_copy(deep=True) + # --- Mark as ACTIVE --- + # original_status = pipeline.status # Store original status for potential rollback + try: + pipeline.status = PipelineStatus.ACTIVE + pipeline.updated_at = datetime.now(UTC) + await self.store.save(pipeline) + logger.info("Pipeline marked as ACTIVE.") + except Exception as e: + logger.error( + f"Failed to mark pipeline as ACTIVE: {e}. Aborting run.", + exc_info=True, + ) + # Attempt to restore status? Requires careful thought on atomicity + # pipeline.status = original_status + # await self.store.save(pipeline) # Potential race condition/overwrite here + return - now = datetime.now(UTC) - final_pipeline_state.status = ( - PipelineStatus.INACTIVE if run_successful else PipelineStatus.FAILED - ) + # --- Execute Pipeline Logic --- + run_successful = False + try: + logger.info("Executing core logic...") + # This call and anything within it will inherit the pipeline_id context + await self._execute_ingestion(pipeline.config.ingestor_config) + logger.info("Core logic finished successfully.") + run_successful = True + except Exception as e: + logger.error( + f"Core logic failed during pipeline run: {e}", exc_info=True + ) + # run_successful remains False - if run_successful: - final_pipeline_state.config.last_run = ( - now # Mark completion time on success + # --- Update Final State --- + try: + # Fetch latest state again (important if external changes possible) + final_pipeline_state = await self.store.get(pipeline_id) + if not final_pipeline_state: + logger.warning( + "Pipeline disappeared during run. Cannot update final state." + ) + # Handle potential deletion during run (e.g., unschedule if needed) + if self.scheduler_manager: + logger.warning( + "Attempting to unschedule potentially orphaned job" + ) + asyncio.create_task( + self.scheduler_manager.unschedule_pipeline(pipeline_id) + ) + return + + final_pipeline_state = final_pipeline_state.model_copy(deep=True) + now = datetime.now(UTC) + final_pipeline_state.status = ( + PipelineStatus.INACTIVE if run_successful else PipelineStatus.FAILED ) - # Calculate and store the *next* run time based on the outcome - # Use the *updated* last_run if the run was successful - current_last_run = ( - final_pipeline_state.config.last_run - ) # This is 'now' if successful, else original last_run - final_pipeline_state.config.next_run = calculate_next_run( - frequency=final_pipeline_state.config.run_frequency, - last_run=current_last_run, # Use the relevant last_run for calculation - start_reference_time=now, # Use current time as reference for calculation - ) + if run_successful: + final_pipeline_state.config.last_run = now - final_pipeline_state.updated_at = ( - now # Update timestamp for this final save - ) - - await self.store.save(final_pipeline_state) - logger.info( - f"Pipeline {pipeline_id} run finished. Status: {final_pipeline_state.status}, Last Run: {final_pipeline_state.config.last_run}, Next Run: {final_pipeline_state.config.next_run}" - ) - - # Notify scheduler about the *new* next run time so it can reschedule accurately - if self.scheduler_manager: - logger.debug( - f"Notifying scheduler to reschedule pipeline {pipeline_id} after run completion with next run {final_pipeline_state.config.next_run}." + current_last_run = final_pipeline_state.config.last_run + final_pipeline_state.config.next_run = calculate_next_run( + frequency=final_pipeline_state.config.run_frequency, + last_run=current_last_run, + start_reference_time=now, ) - asyncio.create_task( - self.scheduler_manager.reschedule_pipeline(final_pipeline_state) + final_pipeline_state.updated_at = now + + await self.store.save(final_pipeline_state) + logger.info( + f"Pipeline run finished. Status: {final_pipeline_state.status}, Last Run: {final_pipeline_state.config.last_run}, Next Run: {final_pipeline_state.config.next_run}" ) - except Exception as e: - logger.error( - f"Failed to update pipeline {pipeline_id} state after run execution: {e}", - exc_info=True, - ) - # The pipeline might be left in ACTIVE or an inconsistent state. - # Consider adding monitoring or retry logic here. + if self.scheduler_manager: + logger.debug( + "Notifying scheduler to reschedule pipeline after run completion" + ) + asyncio.create_task( + self.scheduler_manager.reschedule_pipeline(final_pipeline_state) + ) + + except Exception as e: + logger.error( + f"Failed to update pipeline state after run execution: {e}", + exc_info=True, + ) + # Pipeline might be left ACTIVE or FAILED state might not be saved. Needs monitoring. async def _execute_ingestion(self, config: IngestorInput): """ Executes the ingestion process for a pipeline using the provided IngestorInput config. Returns the ingestion results or raises an exception on failure. """ - # Ensure Ingestor is imported locally or globally if needed - # from ingestion.core import Ingestor # Example import if needed - - # Check if Ingestor is already available (e.g., imported at module level) - # If not, uncomment the import above or ensure it's accessible. - # Assuming Ingestor is available in the scope: try: - # Avoid circular import - from ingestion.core import Ingestor - + # from ..ingestion import Ingestor logger.info(f"Executing ingestion with config: {config}") - # NOTE: Can be async results = Ingestor.run(config.sources) - logger.info(f"Ingestion completed successfully. Results: {results}") + logger.info( + f"Ingestion completed successfully. Results count: {len(results.records)}" + ) return results except ImportError: logger.error("Failed to import Ingestor. Cannot execute ingestion.")