feat: add pipeline service with in-memory data stores

This commit is contained in:
Sosokker 2025-05-12 18:34:21 +07:00
parent ff9de9f80a
commit 946f0b2a24
8 changed files with 348 additions and 144 deletions

View File

@ -0,0 +1,40 @@
from datetime import datetime, timezone
import enum
from uuid import UUID
from pydantic import BaseModel, Field
from models.ingestion import IngestorInput
class PipelineStatus(str, enum.Enum):
ACTIVE = "active"
INACTIVE = "inactive"
class RunFrequency(str, enum.Enum):
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
class PipelineConfig(BaseModel):
ingestor_config: IngestorInput
run_frequency: RunFrequency
last_run: datetime | None = None
next_run: datetime | None = None
class Pipeline(BaseModel):
id: UUID
name: str
description: str
config: PipelineConfig
status: PipelineStatus = Field(default=PipelineStatus.INACTIVE)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class PipelineCreate(BaseModel):
name: str
description: str
config: PipelineConfig

View File

@ -1,68 +0,0 @@
"""
Background service to execute pipelines: ingestion normalization.
"""
from typing import List, Dict, Any
from uuid import UUID
from datetime import datetime
import stores
import models
from ingestion.ingestor import Ingestor
from normalization.normalizer import Normalizer
from log.logging_utils import setup_run_logging, cleanup_run_logging, pipeline_log
def execute_pipeline(pipeline: models.Pipeline, run_id: UUID) -> None:
"""
Execute a pipeline: ingest data, normalize it, and update run status.
Args:
pipeline: The Pipeline model to run.
run_id: UUID of the RunResult to update.
"""
run = stores.runs.get(run_id)
if not run:
return
# Setup structured per-run logging
setup_run_logging(str(pipeline.id), str(run_id))
pipeline_log("INFO", "Pipeline run starting", str(pipeline.id), str(run_id), status="RUNNING")
# Mark as running
run.status = 'RUNNING'
run.started_at = datetime.utcnow()
try:
# Ingest raw records
pipeline_log("INFO", "Ingesting raw records", str(pipeline.id), str(run_id))
raw_records: List[Dict[str, Any]] = Ingestor.run(pipeline.sources)
pipeline_log("INFO", f"Ingested {len(raw_records)} records", str(pipeline.id), str(run_id))
# Normalize records
normalizer = Normalizer()
canonical: List[Dict[str, Any]] = []
for raw in raw_records:
source_type = raw.get('source_type')
source = raw.get('source')
if not source_type or not source:
pipeline_log("ERROR", "Record missing 'source_type' or 'source'", str(pipeline.id), str(run_id), status="FAILED")
raise ValueError("Record missing 'source_type' or 'source'.")
norm = normalizer.normalize([raw], source_type, source)
canonical.extend(norm)
# Success
run.status = 'COMPLETED'
run.finished_at = datetime.utcnow()
run.results = canonical
pipeline_log("SUCCESS", f"Pipeline run completed with {len(canonical)} records", str(pipeline.id), str(run_id), status="COMPLETED")
except Exception as e:
# Log failure with stack trace
pipeline_log("ERROR", f"Pipeline run failed: {e}", str(pipeline.id), str(run_id), status="FAILED", error=str(e))
run.status = 'FAILED'
run.finished_at = datetime.utcnow()
run.error = str(e)
finally:
pipeline_log("INFO", "Pipeline run finished", str(pipeline.id), str(run_id), status=run.status)
cleanup_run_logging(str(run_id))

View File

View File

@ -0,0 +1,185 @@
from datetime import datetime, timezone
from uuid import UUID, uuid4
from loguru import logger
from models.pipeline import (
Pipeline,
PipelineCreate,
PipelineConfig,
RunFrequency,
PipelineStatus,
)
from models.ingestion import IngestorInput
from stores.base import PipelineStore
class PipelineService:
def __init__(self, store: PipelineStore):
self.store = store
logger.info(f"PipelineService initialized with store: {type(store).__name__}")
async def create_pipeline(
self,
name: str,
description: str,
ingestor_config: IngestorInput,
run_frequency: RunFrequency,
) -> Pipeline:
"""Create a new pipeline using the store"""
logger.info(
f"Creating pipeline: name={name}, description={description}, ingestor_config=..., run_frequency={run_frequency}"
)
try:
pipeline_id = uuid4()
now = datetime.now(timezone.utc)
pipeline = Pipeline(
id=pipeline_id,
name=name,
description=description,
config=PipelineConfig(
ingestor_config=ingestor_config,
run_frequency=run_frequency,
# will be set by scheduler
last_run=None,
next_run=None,
),
status=PipelineStatus.INACTIVE,
created_at=now,
updated_at=now,
)
await self.store.save(pipeline)
logger.info(f"Pipeline created and saved: id={pipeline.id}")
return pipeline
except Exception as e:
logger.error(f"Failed to create pipeline: {e}")
raise e
async def update_pipeline(
self, pipeline_id: UUID, pipeline_in: PipelineCreate
) -> Pipeline | None:
"""Update an existing pipeline using the store"""
logger.info(f"Updating pipeline: id={pipeline_id}, update_data=...")
existing_pipeline = await self.store.get(pipeline_id)
if not existing_pipeline:
logger.warning(f"Pipeline not found for update: id={pipeline_id}")
return None
try:
# TODO: review code here
# Create an updated pipeline object based on existing one and input
# Be careful about mutable defaults if not using Pydantic properly
update_data = pipeline_in.model_dump(
exclude_unset=True
) # Get only provided fields
# Merge update_data into a dict representation of the existing pipeline
# Note: This simplistic merge might need refinement based on how partial updates should behave,
# especially for nested structures like 'config'. Pydantic's model_update might be useful here if applicable.
updated_pipeline_data = existing_pipeline.model_dump()
if "name" in update_data:
updated_pipeline_data["name"] = update_data["name"]
if "description" in update_data:
updated_pipeline_data["description"] = update_data["description"]
# Merge config - replace the whole config if provided in input
if "config" in update_data:
# Ensure the input config is a PipelineConfig object before assignment
if pipeline_in.config:
updated_pipeline_data["config"] = pipeline_in.config.model_dump()
else:
# Handle cases where config might be passed differently if needed
logger.error("Invalid config format for update")
raise ValueError("Invalid config format for update")
# Re-validate the merged data into a Pipeline object
# Preserve original ID and created_at
updated_pipeline = Pipeline.model_validate(updated_pipeline_data)
# updated_at is handled by the store's save method
await self.store.save(updated_pipeline) # Save the updated object
logger.info(f"Pipeline updated: id={updated_pipeline.id}")
return updated_pipeline
except Exception as e:
logger.error(f"Failed to update pipeline id={pipeline_id}: {e}")
raise e
async def delete_pipeline(self, pipeline_id: UUID) -> bool:
"""Delete an existing pipeline using the store"""
logger.info(f"Deleting pipeline: id={pipeline_id}")
deleted = await self.store.delete(pipeline_id)
if not deleted:
logger.warning(f"Pipeline not found for deletion: id={pipeline_id}")
return deleted
async def get_pipeline(self, pipeline_id: UUID) -> Pipeline | None:
"""Get a single pipeline by ID"""
logger.info(f"Getting pipeline: id={pipeline_id}")
return await self.store.get(pipeline_id)
async def list_pipelines(self) -> list[Pipeline]:
"""Get all pipelines"""
logger.info("Listing all pipelines")
return await self.store.get_all()
async def run_pipeline(self, pipeline_id: UUID) -> None:
"""Run an existing pipeline"""
pipeline = await self.store.get(pipeline_id)
if not pipeline:
logger.error(f"Cannot run pipeline: Pipeline not found (id={pipeline_id})")
return
if pipeline.status == PipelineStatus.ACTIVE:
logger.warning(
f"Pipeline id={pipeline_id} is already active/running (status={pipeline.status}). Skipping run."
)
return
logger.info(
f"Attempting to run pipeline: id={pipeline_id}, name='{pipeline.name}'"
)
try:
# 2. Update status to ACTIVE (optional, depends on desired state mgmt)
# pipeline.status = PipelineStatus.ACTIVE
# pipeline.config.last_run = datetime.now(timezone.utc) # Mark start time
# await self.store.save(pipeline)
# 3. Execute the actual pipeline logic (ingestion, processing, etc.)
# This part depends heavily on your pipeline runner implementation
# It would use pipeline.config.ingestor_config, etc.
logger.info(f"Executing pipeline logic for id={pipeline_id}...")
# ...
# result = await actual_pipeline_runner(pipeline.config)
# ...
logger.info(f"Pipeline logic finished for id={pipeline_id}.")
# 4. Update status (e.g., back to INACTIVE) and run times on completion
# Fetch the latest state in case it changed during run
current_pipeline_state = await self.store.get(pipeline_id)
if current_pipeline_state:
current_pipeline_state.status = (
PipelineStatus.INACTIVE
) # Or COMPLETED if you have that state
current_pipeline_state.config.last_run = datetime.now(
timezone.utc
) # Mark end time
# TODO: Calculate next_run based on current_pipeline_state.config.run_frequency
# current_pipeline_state.config.next_run = calculate_next_run(...)
await self.store.save(current_pipeline_state)
logger.info(
f"Pipeline run finished and state updated for id={pipeline_id}"
)
else:
logger.warning(
f"Pipeline id={pipeline_id} disappeared during run. Cannot update final state."
)
except Exception as e:
logger.error(f"Failed during pipeline run id={pipeline_id}: {e}")
# Optionally update status to FAILED
current_pipeline_state = await self.store.get(pipeline_id)
if current_pipeline_state:
# current_pipeline_state.status = PipelineStatus.FAILED # Add a FAILED state if desired
await self.store.save(current_pipeline_state)
# Handle/log the error appropriately
# raise # Optionally re-raise

View File

@ -1,76 +0,0 @@
"""
Inmemory stores for pipelines and runs.
"""
from typing import Dict, List, Optional
from uuid import UUID, uuid4
from datetime import datetime
import models
# Inmemory storage
pipelines: Dict[UUID, models.Pipeline] = {}
runs: Dict[UUID, models.RunResult] = {}
def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline:
"""
Create and store a new pipeline.
"""
pipeline_id = uuid4()
now = datetime.utcnow()
pipeline = models.Pipeline(
id=pipeline_id,
name=pipeline_in.name,
sources=pipeline_in.sources,
created_at=now,
)
pipelines[pipeline_id] = pipeline
return pipeline
def get_pipeline(pipeline_id: UUID) -> Optional[models.Pipeline]:
"""
Retrieve a pipeline by its ID.
"""
return pipelines.get(pipeline_id)
def list_pipelines() -> List[models.Pipeline]:
"""
List all registered pipelines.
"""
return list(pipelines.values())
def create_run(pipeline_id: UUID) -> models.RunResult:
"""
Create and store a new run for a given pipeline.
"""
run_id = uuid4()
now = datetime.utcnow()
run = models.RunResult(
id=run_id,
pipeline_id=pipeline_id,
status='PENDING',
started_at=now,
finished_at=None,
results=None,
error=None,
)
runs[run_id] = run
return run
def get_run(run_id: UUID) -> Optional[models.RunResult]:
"""
Retrieve a run by its ID.
"""
return runs.get(run_id)
def list_runs_for_pipeline(pipeline_id: UUID) -> List[models.RunResult]:
"""
List all runs for a specific pipeline.
"""
return [r for r in runs.values() if r.pipeline_id == pipeline_id]

View File

62
pipeline/stores/base.py Normal file
View File

@ -0,0 +1,62 @@
from abc import ABC, abstractmethod
from typing import List, Optional
from uuid import UUID
from models.pipeline import Pipeline, PipelineCreate
class PipelineStore(ABC):
"""
Abstract Base Class for pipeline persistence.
Defines the contract for saving, retrieving, and deleting pipelines.
"""
@abstractmethod
async def save(self, pipeline: Pipeline) -> None:
"""
Save a pipeline (create or update).
Implementations should handle checking if the ID exists
and performing an insert or update accordingly.
They should also update the 'updated_at' timestamp.
"""
pass
@abstractmethod
async def get(self, pipeline_id: UUID) -> Optional[Pipeline]:
"""
Retrieve a pipeline by its ID.
Returns the Pipeline object if found, otherwise None.
"""
pass
@abstractmethod
async def get_all(self) -> List[Pipeline]:
"""
Retrieve all pipelines.
Returns a list of Pipeline objects.
"""
pass
@abstractmethod
async def delete(self, pipeline_id: UUID) -> bool:
"""
Delete a pipeline by its ID.
Returns True if deletion was successful, False otherwise (e.g., if not found).
"""
pass
@abstractmethod
async def update(self, pipeline_id: UUID, pipeline_in: PipelineCreate) -> Pipeline:
"""
Update an existing pipeline.
Returns the updated Pipeline object.
"""
pass
async def connect(self) -> None:
"""Optional: Perform setup/connection logic."""
pass
async def disconnect(self) -> None:
"""Optional: Perform cleanup/disconnection logic."""
pass

61
pipeline/stores/memory.py Normal file
View File

@ -0,0 +1,61 @@
from datetime import datetime, timezone
from typing import Dict, List, Optional
from uuid import UUID
from loguru import logger
from models.pipeline import Pipeline, PipelineCreate
from .base import PipelineStore
class InMemoryPipelineStore(PipelineStore):
"""
In-memory implementation of the PipelineStore.
Stores pipelines in a simple dictionary. Not persistent across restarts.
"""
_pipelines: Dict[UUID, Pipeline]
def __init__(self):
logger.info("Initializing InMemoryPipelineStore")
self._pipelines = {}
async def save(self, pipeline: Pipeline) -> None:
logger.debug(f"Saving pipeline (in-memory): id={pipeline.id}")
pipeline.updated_at = datetime.now(timezone.utc)
self._pipelines[pipeline.id] = pipeline.model_copy(deep=True)
logger.info(f"Pipeline saved (in-memory): id={pipeline.id}")
async def get(self, pipeline_id: UUID) -> Optional[Pipeline]:
logger.debug(f"Getting pipeline (in-memory): id={pipeline_id}")
pipeline = self._pipelines.get(pipeline_id)
if pipeline:
return pipeline.model_copy(deep=True)
logger.warning(f"Pipeline not found (in-memory): id={pipeline_id}")
return None
async def get_all(self) -> List[Pipeline]:
logger.debug("Getting all pipelines (in-memory)")
return [p.model_copy(deep=True) for p in self._pipelines.values()]
async def delete(self, pipeline_id: UUID) -> bool:
logger.debug(f"Deleting pipeline (in-memory): id={pipeline_id}")
if pipeline_id in self._pipelines:
del self._pipelines[pipeline_id]
logger.info(f"Pipeline deleted (in-memory): id={pipeline_id}")
return True
logger.warning(f"Pipeline not found for deletion (in-memory): id={pipeline_id}")
return False
async def update(self, pipeline_id: UUID, pipeline_in: PipelineCreate) -> Pipeline:
logger.debug(f"Updating pipeline (in-memory): id={pipeline_id}")
pipeline = self._pipelines.get(pipeline_id)
if not pipeline:
raise ValueError(f"Pipeline not found (in-memory): id={pipeline_id}")
pipeline.name = pipeline_in.name
pipeline.description = pipeline_in.description
pipeline.config = pipeline_in.config
pipeline.updated_at = datetime.now(timezone.utc)
self._pipelines[pipeline_id] = pipeline.model_copy(deep=True)
logger.info(f"Pipeline updated (in-memory): id={pipeline_id}")
return pipeline.model_copy(deep=True)