diff --git a/pipeline/models/pipeline.py b/pipeline/models/pipeline.py new file mode 100644 index 0000000..1b22618 --- /dev/null +++ b/pipeline/models/pipeline.py @@ -0,0 +1,40 @@ +from datetime import datetime, timezone +import enum +from uuid import UUID +from pydantic import BaseModel, Field + +from models.ingestion import IngestorInput + + +class PipelineStatus(str, enum.Enum): + ACTIVE = "active" + INACTIVE = "inactive" + + +class RunFrequency(str, enum.Enum): + DAILY = "daily" + WEEKLY = "weekly" + MONTHLY = "monthly" + + +class PipelineConfig(BaseModel): + ingestor_config: IngestorInput + run_frequency: RunFrequency + last_run: datetime | None = None + next_run: datetime | None = None + + +class Pipeline(BaseModel): + id: UUID + name: str + description: str + config: PipelineConfig + status: PipelineStatus = Field(default=PipelineStatus.INACTIVE) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class PipelineCreate(BaseModel): + name: str + description: str + config: PipelineConfig diff --git a/pipeline/services.py b/pipeline/services.py deleted file mode 100644 index fd7819e..0000000 --- a/pipeline/services.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -Background service to execute pipelines: ingestion → normalization. -""" - -from typing import List, Dict, Any -from uuid import UUID -from datetime import datetime - -import stores -import models -from ingestion.ingestor import Ingestor -from normalization.normalizer import Normalizer -from log.logging_utils import setup_run_logging, cleanup_run_logging, pipeline_log - - -def execute_pipeline(pipeline: models.Pipeline, run_id: UUID) -> None: - """ - Execute a pipeline: ingest data, normalize it, and update run status. - - Args: - pipeline: The Pipeline model to run. - run_id: UUID of the RunResult to update. - """ - run = stores.runs.get(run_id) - if not run: - return - - # Setup structured per-run logging - setup_run_logging(str(pipeline.id), str(run_id)) - pipeline_log("INFO", "Pipeline run starting", str(pipeline.id), str(run_id), status="RUNNING") - - # Mark as running - run.status = 'RUNNING' - run.started_at = datetime.utcnow() - - try: - # Ingest raw records - pipeline_log("INFO", "Ingesting raw records", str(pipeline.id), str(run_id)) - raw_records: List[Dict[str, Any]] = Ingestor.run(pipeline.sources) - pipeline_log("INFO", f"Ingested {len(raw_records)} records", str(pipeline.id), str(run_id)) - - # Normalize records - normalizer = Normalizer() - canonical: List[Dict[str, Any]] = [] - for raw in raw_records: - source_type = raw.get('source_type') - source = raw.get('source') - if not source_type or not source: - pipeline_log("ERROR", "Record missing 'source_type' or 'source'", str(pipeline.id), str(run_id), status="FAILED") - raise ValueError("Record missing 'source_type' or 'source'.") - norm = normalizer.normalize([raw], source_type, source) - canonical.extend(norm) - - # Success - run.status = 'COMPLETED' - run.finished_at = datetime.utcnow() - run.results = canonical - pipeline_log("SUCCESS", f"Pipeline run completed with {len(canonical)} records", str(pipeline.id), str(run_id), status="COMPLETED") - - except Exception as e: - # Log failure with stack trace - pipeline_log("ERROR", f"Pipeline run failed: {e}", str(pipeline.id), str(run_id), status="FAILED", error=str(e)) - run.status = 'FAILED' - run.finished_at = datetime.utcnow() - run.error = str(e) - finally: - pipeline_log("INFO", "Pipeline run finished", str(pipeline.id), str(run_id), status=run.status) - cleanup_run_logging(str(run_id)) \ No newline at end of file diff --git a/pipeline/services/__init__.py b/pipeline/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipeline/services/pipeline_service.py b/pipeline/services/pipeline_service.py new file mode 100644 index 0000000..0f3416f --- /dev/null +++ b/pipeline/services/pipeline_service.py @@ -0,0 +1,185 @@ +from datetime import datetime, timezone +from uuid import UUID, uuid4 +from loguru import logger + +from models.pipeline import ( + Pipeline, + PipelineCreate, + PipelineConfig, + RunFrequency, + PipelineStatus, +) +from models.ingestion import IngestorInput +from stores.base import PipelineStore + + +class PipelineService: + def __init__(self, store: PipelineStore): + self.store = store + logger.info(f"PipelineService initialized with store: {type(store).__name__}") + + async def create_pipeline( + self, + name: str, + description: str, + ingestor_config: IngestorInput, + run_frequency: RunFrequency, + ) -> Pipeline: + """Create a new pipeline using the store""" + logger.info( + f"Creating pipeline: name={name}, description={description}, ingestor_config=..., run_frequency={run_frequency}" + ) + try: + pipeline_id = uuid4() + now = datetime.now(timezone.utc) + pipeline = Pipeline( + id=pipeline_id, + name=name, + description=description, + config=PipelineConfig( + ingestor_config=ingestor_config, + run_frequency=run_frequency, + # will be set by scheduler + last_run=None, + next_run=None, + ), + status=PipelineStatus.INACTIVE, + created_at=now, + updated_at=now, + ) + await self.store.save(pipeline) + logger.info(f"Pipeline created and saved: id={pipeline.id}") + return pipeline + except Exception as e: + logger.error(f"Failed to create pipeline: {e}") + raise e + + async def update_pipeline( + self, pipeline_id: UUID, pipeline_in: PipelineCreate + ) -> Pipeline | None: + """Update an existing pipeline using the store""" + logger.info(f"Updating pipeline: id={pipeline_id}, update_data=...") + existing_pipeline = await self.store.get(pipeline_id) + if not existing_pipeline: + logger.warning(f"Pipeline not found for update: id={pipeline_id}") + return None + + try: + # TODO: review code here + # Create an updated pipeline object based on existing one and input + # Be careful about mutable defaults if not using Pydantic properly + update_data = pipeline_in.model_dump( + exclude_unset=True + ) # Get only provided fields + + # Merge update_data into a dict representation of the existing pipeline + # Note: This simplistic merge might need refinement based on how partial updates should behave, + # especially for nested structures like 'config'. Pydantic's model_update might be useful here if applicable. + updated_pipeline_data = existing_pipeline.model_dump() + + if "name" in update_data: + updated_pipeline_data["name"] = update_data["name"] + if "description" in update_data: + updated_pipeline_data["description"] = update_data["description"] + + # Merge config - replace the whole config if provided in input + if "config" in update_data: + # Ensure the input config is a PipelineConfig object before assignment + if pipeline_in.config: + updated_pipeline_data["config"] = pipeline_in.config.model_dump() + else: + # Handle cases where config might be passed differently if needed + logger.error("Invalid config format for update") + raise ValueError("Invalid config format for update") + + # Re-validate the merged data into a Pipeline object + # Preserve original ID and created_at + updated_pipeline = Pipeline.model_validate(updated_pipeline_data) + # updated_at is handled by the store's save method + + await self.store.save(updated_pipeline) # Save the updated object + logger.info(f"Pipeline updated: id={updated_pipeline.id}") + return updated_pipeline + except Exception as e: + logger.error(f"Failed to update pipeline id={pipeline_id}: {e}") + raise e + + async def delete_pipeline(self, pipeline_id: UUID) -> bool: + """Delete an existing pipeline using the store""" + logger.info(f"Deleting pipeline: id={pipeline_id}") + deleted = await self.store.delete(pipeline_id) + if not deleted: + logger.warning(f"Pipeline not found for deletion: id={pipeline_id}") + return deleted + + async def get_pipeline(self, pipeline_id: UUID) -> Pipeline | None: + """Get a single pipeline by ID""" + logger.info(f"Getting pipeline: id={pipeline_id}") + return await self.store.get(pipeline_id) + + async def list_pipelines(self) -> list[Pipeline]: + """Get all pipelines""" + logger.info("Listing all pipelines") + return await self.store.get_all() + + async def run_pipeline(self, pipeline_id: UUID) -> None: + """Run an existing pipeline""" + pipeline = await self.store.get(pipeline_id) + if not pipeline: + logger.error(f"Cannot run pipeline: Pipeline not found (id={pipeline_id})") + return + if pipeline.status == PipelineStatus.ACTIVE: + logger.warning( + f"Pipeline id={pipeline_id} is already active/running (status={pipeline.status}). Skipping run." + ) + return + + logger.info( + f"Attempting to run pipeline: id={pipeline_id}, name='{pipeline.name}'" + ) + + try: + # 2. Update status to ACTIVE (optional, depends on desired state mgmt) + # pipeline.status = PipelineStatus.ACTIVE + # pipeline.config.last_run = datetime.now(timezone.utc) # Mark start time + # await self.store.save(pipeline) + + # 3. Execute the actual pipeline logic (ingestion, processing, etc.) + # This part depends heavily on your pipeline runner implementation + # It would use pipeline.config.ingestor_config, etc. + logger.info(f"Executing pipeline logic for id={pipeline_id}...") + # ... + # result = await actual_pipeline_runner(pipeline.config) + # ... + logger.info(f"Pipeline logic finished for id={pipeline_id}.") + + # 4. Update status (e.g., back to INACTIVE) and run times on completion + # Fetch the latest state in case it changed during run + current_pipeline_state = await self.store.get(pipeline_id) + if current_pipeline_state: + current_pipeline_state.status = ( + PipelineStatus.INACTIVE + ) # Or COMPLETED if you have that state + current_pipeline_state.config.last_run = datetime.now( + timezone.utc + ) # Mark end time + # TODO: Calculate next_run based on current_pipeline_state.config.run_frequency + # current_pipeline_state.config.next_run = calculate_next_run(...) + await self.store.save(current_pipeline_state) + logger.info( + f"Pipeline run finished and state updated for id={pipeline_id}" + ) + else: + logger.warning( + f"Pipeline id={pipeline_id} disappeared during run. Cannot update final state." + ) + + except Exception as e: + logger.error(f"Failed during pipeline run id={pipeline_id}: {e}") + # Optionally update status to FAILED + current_pipeline_state = await self.store.get(pipeline_id) + if current_pipeline_state: + # current_pipeline_state.status = PipelineStatus.FAILED # Add a FAILED state if desired + await self.store.save(current_pipeline_state) + # Handle/log the error appropriately + # raise # Optionally re-raise diff --git a/pipeline/stores.py b/pipeline/stores.py deleted file mode 100644 index 8f36714..0000000 --- a/pipeline/stores.py +++ /dev/null @@ -1,76 +0,0 @@ -""" -In‐memory stores for pipelines and runs. -""" - -from typing import Dict, List, Optional -from uuid import UUID, uuid4 -from datetime import datetime - -import models - -# In‐memory storage -pipelines: Dict[UUID, models.Pipeline] = {} -runs: Dict[UUID, models.RunResult] = {} - - -def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline: - """ - Create and store a new pipeline. - """ - pipeline_id = uuid4() - now = datetime.utcnow() - pipeline = models.Pipeline( - id=pipeline_id, - name=pipeline_in.name, - sources=pipeline_in.sources, - created_at=now, - ) - pipelines[pipeline_id] = pipeline - return pipeline - - -def get_pipeline(pipeline_id: UUID) -> Optional[models.Pipeline]: - """ - Retrieve a pipeline by its ID. - """ - return pipelines.get(pipeline_id) - - -def list_pipelines() -> List[models.Pipeline]: - """ - List all registered pipelines. - """ - return list(pipelines.values()) - - -def create_run(pipeline_id: UUID) -> models.RunResult: - """ - Create and store a new run for a given pipeline. - """ - run_id = uuid4() - now = datetime.utcnow() - run = models.RunResult( - id=run_id, - pipeline_id=pipeline_id, - status='PENDING', - started_at=now, - finished_at=None, - results=None, - error=None, - ) - runs[run_id] = run - return run - - -def get_run(run_id: UUID) -> Optional[models.RunResult]: - """ - Retrieve a run by its ID. - """ - return runs.get(run_id) - - -def list_runs_for_pipeline(pipeline_id: UUID) -> List[models.RunResult]: - """ - List all runs for a specific pipeline. - """ - return [r for r in runs.values() if r.pipeline_id == pipeline_id] \ No newline at end of file diff --git a/pipeline/stores/__init__.py b/pipeline/stores/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipeline/stores/base.py b/pipeline/stores/base.py new file mode 100644 index 0000000..e72fd92 --- /dev/null +++ b/pipeline/stores/base.py @@ -0,0 +1,62 @@ +from abc import ABC, abstractmethod +from typing import List, Optional +from uuid import UUID + +from models.pipeline import Pipeline, PipelineCreate + + +class PipelineStore(ABC): + """ + Abstract Base Class for pipeline persistence. + Defines the contract for saving, retrieving, and deleting pipelines. + """ + + @abstractmethod + async def save(self, pipeline: Pipeline) -> None: + """ + Save a pipeline (create or update). + Implementations should handle checking if the ID exists + and performing an insert or update accordingly. + They should also update the 'updated_at' timestamp. + """ + pass + + @abstractmethod + async def get(self, pipeline_id: UUID) -> Optional[Pipeline]: + """ + Retrieve a pipeline by its ID. + Returns the Pipeline object if found, otherwise None. + """ + pass + + @abstractmethod + async def get_all(self) -> List[Pipeline]: + """ + Retrieve all pipelines. + Returns a list of Pipeline objects. + """ + pass + + @abstractmethod + async def delete(self, pipeline_id: UUID) -> bool: + """ + Delete a pipeline by its ID. + Returns True if deletion was successful, False otherwise (e.g., if not found). + """ + pass + + @abstractmethod + async def update(self, pipeline_id: UUID, pipeline_in: PipelineCreate) -> Pipeline: + """ + Update an existing pipeline. + Returns the updated Pipeline object. + """ + pass + + async def connect(self) -> None: + """Optional: Perform setup/connection logic.""" + pass + + async def disconnect(self) -> None: + """Optional: Perform cleanup/disconnection logic.""" + pass diff --git a/pipeline/stores/memory.py b/pipeline/stores/memory.py new file mode 100644 index 0000000..145d3e9 --- /dev/null +++ b/pipeline/stores/memory.py @@ -0,0 +1,61 @@ +from datetime import datetime, timezone +from typing import Dict, List, Optional +from uuid import UUID + +from loguru import logger + +from models.pipeline import Pipeline, PipelineCreate +from .base import PipelineStore + + +class InMemoryPipelineStore(PipelineStore): + """ + In-memory implementation of the PipelineStore. + Stores pipelines in a simple dictionary. Not persistent across restarts. + """ + + _pipelines: Dict[UUID, Pipeline] + + def __init__(self): + logger.info("Initializing InMemoryPipelineStore") + self._pipelines = {} + + async def save(self, pipeline: Pipeline) -> None: + logger.debug(f"Saving pipeline (in-memory): id={pipeline.id}") + pipeline.updated_at = datetime.now(timezone.utc) + self._pipelines[pipeline.id] = pipeline.model_copy(deep=True) + logger.info(f"Pipeline saved (in-memory): id={pipeline.id}") + + async def get(self, pipeline_id: UUID) -> Optional[Pipeline]: + logger.debug(f"Getting pipeline (in-memory): id={pipeline_id}") + pipeline = self._pipelines.get(pipeline_id) + if pipeline: + return pipeline.model_copy(deep=True) + logger.warning(f"Pipeline not found (in-memory): id={pipeline_id}") + return None + + async def get_all(self) -> List[Pipeline]: + logger.debug("Getting all pipelines (in-memory)") + return [p.model_copy(deep=True) for p in self._pipelines.values()] + + async def delete(self, pipeline_id: UUID) -> bool: + logger.debug(f"Deleting pipeline (in-memory): id={pipeline_id}") + if pipeline_id in self._pipelines: + del self._pipelines[pipeline_id] + logger.info(f"Pipeline deleted (in-memory): id={pipeline_id}") + return True + logger.warning(f"Pipeline not found for deletion (in-memory): id={pipeline_id}") + return False + + async def update(self, pipeline_id: UUID, pipeline_in: PipelineCreate) -> Pipeline: + logger.debug(f"Updating pipeline (in-memory): id={pipeline_id}") + pipeline = self._pipelines.get(pipeline_id) + if not pipeline: + raise ValueError(f"Pipeline not found (in-memory): id={pipeline_id}") + pipeline.name = pipeline_in.name + pipeline.description = pipeline_in.description + pipeline.config = pipeline_in.config + pipeline.updated_at = datetime.now(timezone.utc) + self._pipelines[pipeline_id] = pipeline.model_copy(deep=True) + logger.info(f"Pipeline updated (in-memory): id={pipeline_id}") + return pipeline.model_copy(deep=True)