backend-api/pipeline/services/pipeline_service.py

386 lines
16 KiB
Python

"""
Pipeline service to help do pipeline CRUD
"""
import asyncio
from datetime import datetime
from uuid import UUID, uuid4
from typing import Optional, List, TYPE_CHECKING
from loguru import logger
from models.pipeline import (
Pipeline,
PipelineCreate,
PipelineConfig,
RunFrequency,
PipelineStatus,
)
from models.ingestion import IngestorInput
from stores.base import PipelineStore
from scheduler.utils import calculate_next_run, UTC
# !use TYPE_CHECKING to avoid circular imports at runtime
# the SchedulerManager needs PipelineService, and PipelineService now needs SchedulerManager
if TYPE_CHECKING:
from scheduler.manager import SchedulerManager
class PipelineService:
"""
Pipeline service to help do pipeline CRUD
"""
def __init__(
self,
store: PipelineStore,
scheduler_manager: Optional["SchedulerManager"] = None,
):
self.store = store
self.scheduler_manager: Optional["SchedulerManager"] = (
scheduler_manager # Store the scheduler instance
)
logger.info(f"PipelineService initialized with store: {type(store).__name__}")
if scheduler_manager:
logger.info("PipelineService configured with SchedulerManager.")
else:
logger.warning(
"PipelineService initialized without SchedulerManager. Scheduling notifications disabled."
)
def set_scheduler_manager(self, scheduler_manager: "SchedulerManager"):
"""
Method to link the scheduler later if needed (e.g., after both are created)
"""
self.scheduler_manager = scheduler_manager
logger.info("SchedulerManager linked to PipelineService.")
async def create_pipeline(
self,
name: str,
description: str,
ingestor_config: IngestorInput,
run_frequency: RunFrequency,
) -> Pipeline:
"""Create a new pipeline, save it, and notify the scheduler."""
logger.info(
f"Creating pipeline: name={name}, description={description}, run_frequency={run_frequency}"
)
try:
pipeline_id = uuid4()
now = datetime.now(UTC)
# Calculate the initial next_run time
initial_next_run = calculate_next_run(
frequency=run_frequency,
last_run=None,
start_reference_time=now,
)
pipeline = Pipeline(
id=pipeline_id,
name=name,
description=description,
config=PipelineConfig(
ingestor_config=ingestor_config,
run_frequency=run_frequency,
last_run=None,
next_run=initial_next_run,
),
status=PipelineStatus.INACTIVE,
created_at=now,
updated_at=now,
)
await self.store.save(pipeline)
logger.info(
f"Pipeline created and saved: id={pipeline.id}, next_run={initial_next_run}"
)
# Notify the scheduler to add the job immediately
if self.scheduler_manager:
logger.debug(f"Notifying scheduler to schedule pipeline {pipeline.id}")
# Use asyncio.create_task for fire-and-forget notification
asyncio.create_task(self.scheduler_manager.schedule_pipeline(pipeline))
else:
logger.warning(
f"Scheduler not available, cannot schedule pipeline {pipeline.id} immediately."
)
return pipeline
except Exception as e:
logger.error(f"Failed to create pipeline: {e}", exc_info=True)
raise # Re-raise the exception after logging
async def update_pipeline(
self, pipeline_id: UUID, pipeline_in: PipelineCreate
) -> Optional[Pipeline]:
"""Update an existing pipeline, save it, and notify the scheduler."""
logger.info(f"Updating pipeline: id={pipeline_id}")
existing_pipeline = await self.store.get(pipeline_id)
if not existing_pipeline:
logger.warning(f"Pipeline not found for update: id={pipeline_id}")
return None
try:
# 1. Create a deep copy to modify
updated_pipeline = existing_pipeline.model_copy(deep=True)
# 2. Update top-level fields directly from the input model
updated_pipeline.name = pipeline_in.name
updated_pipeline.description = pipeline_in.description
# 3. Handle config update carefully
config_changed = False
frequency_changed = False
original_frequency = (
updated_pipeline.config.run_frequency
) # Store before potential change
# Check if the input payload actually provided config data
if pipeline_in.config:
config_changed = True
# Update the fields *within* the existing config object
# Ensure the nested ingestor_config is also handled correctly (assuming assignment works or potentially use model_copy/re-init if complex)
updated_pipeline.config.ingestor_config = (
pipeline_in.config.ingestor_config.model_copy(deep=True)
) # Use model_copy for safety
updated_pipeline.config.run_frequency = pipeline_in.config.run_frequency
# Check if the frequency actually changed after the update
if updated_pipeline.config.run_frequency != original_frequency:
frequency_changed = True
# 4. Recalculate next_run ONLY if frequency changed
if frequency_changed:
logger.info(
f"Run frequency changed for pipeline {pipeline_id} from {original_frequency} to {updated_pipeline.config.run_frequency}. Recalculating next run."
)
now = datetime.now(UTC)
# Use the existing last_run from the copied object
updated_pipeline.config.next_run = calculate_next_run(
frequency=updated_pipeline.config.run_frequency,
last_run=updated_pipeline.config.last_run,
start_reference_time=now,
)
logger.info(
f"Recalculated next_run for {pipeline_id}: {updated_pipeline.config.next_run}"
)
# 5. Update the timestamp before saving
updated_pipeline.updated_at = datetime.now(UTC)
# 6. Save the updated pipeline
await self.store.save(updated_pipeline)
logger.info(f"Pipeline updated successfully: id={updated_pipeline.id}")
# 7. Notify the scheduler if config changed (including frequency)
# Scheduler needs the *final* state of the updated pipeline for rescheduling.
if self.scheduler_manager and config_changed:
logger.debug(
f"Notifying scheduler to reschedule pipeline {updated_pipeline.id} due to config change."
)
# Pass the fully updated pipeline object
asyncio.create_task(
self.scheduler_manager.reschedule_pipeline(updated_pipeline)
)
elif self.scheduler_manager:
logger.debug(
f"Pipeline {updated_pipeline.id} updated (non-config fields), no reschedule needed based on config."
# NOTE: might still want to reschedule if other non-config updates could affect execution,
# but based on current logic, only config changes trigger rescheduling.
)
return updated_pipeline
except Exception as e:
logger.error(
f"Failed to update pipeline id={pipeline_id}: {e}", exc_info=True
)
raise
async def delete_pipeline(self, pipeline_id: UUID) -> bool:
"""Delete an existing pipeline and notify the scheduler."""
logger.info(f"Attempting to delete pipeline: id={pipeline_id}")
pipeline_exists = await self.store.get(pipeline_id) is not None
if not pipeline_exists:
logger.warning(f"Pipeline {pipeline_id} not found for deletion.")
return False
# Notify scheduler *before* deleting from store, in case deletion fails
if self.scheduler_manager:
logger.debug(f"Notifying scheduler to unschedule pipeline {pipeline_id}")
# We need to wait for this to ensure the job is removed before DB record gone
await self.scheduler_manager.unschedule_pipeline(pipeline_id)
else:
logger.warning(
f"Scheduler not available, cannot unschedule pipeline {pipeline_id}."
)
# Proceed with deletion from store
deleted = await self.store.delete(pipeline_id)
if deleted:
logger.info(f"Pipeline deleted successfully from store: id={pipeline_id}")
else:
# This might happen if pipeline was already gone, or store error
logger.warning(
f"Pipeline {pipeline_id} not found in store for deletion, or delete failed."
)
# Scheduler job should have been removed anyway if it existed.
return deleted
async def get_pipeline(self, pipeline_id: UUID) -> Optional[Pipeline]:
"""Get a single pipeline by ID."""
logger.debug(f"Getting pipeline: id={pipeline_id}")
return await self.store.get(pipeline_id)
async def list_pipelines(self) -> List[Pipeline]:
"""Get all pipelines."""
logger.debug("Listing all pipelines")
return await self.store.get_all()
async def run_pipeline(self, pipeline_id: UUID) -> None:
"""
Executes the pipeline logic, updating status and run times.
This is called by the scheduler job or manual trigger.
"""
logger.info(f"Attempting run execution for pipeline: id={pipeline_id}")
pipeline = await self.store.get(pipeline_id)
if not pipeline:
logger.error(f"Cannot run pipeline: Pipeline not found (id={pipeline_id})")
return
# NOTE: lock mechanism
if pipeline.status == PipelineStatus.ACTIVE:
logger.warning(
f"Pipeline id={pipeline_id} is already ACTIVE. Skipping run."
)
return
# --- Mark as ACTIVE ---
try:
pipeline.status = PipelineStatus.ACTIVE
pipeline.updated_at = datetime.now(UTC) # Update timestamp
await self.store.save(pipeline)
logger.info(f"Pipeline {pipeline_id} marked as ACTIVE.")
except Exception as e:
logger.error(
f"Failed to mark pipeline {pipeline_id} as ACTIVE: {e}. Aborting run.",
exc_info=True,
)
# Attempt to restore status? Depends on store guarantees.
# pipeline.status = original_status # Potentially try rollback
return # Abort run
# --- Execute Pipeline Logic ---
run_successful = False
try:
logger.info(f"Executing core logic for pipeline id={pipeline_id}...")
# ---------------------------------------------------
# Ensure _execute_ingestion is awaited if it's async
await self._execute_ingestion(pipeline.config.ingestor_config)
# ---------------------------------------------------
logger.info(f"Core logic finished successfully for id={pipeline_id}.")
run_successful = True
except Exception as e:
logger.error(
f"Core logic failed during pipeline run id={pipeline_id}: {e}",
exc_info=True,
)
# run_successful remains False
# --- Update Final State ---
try:
# Fetch the latest state again to minimize race conditions, though the ACTIVE lock helps
final_pipeline_state = await self.store.get(pipeline_id)
if not final_pipeline_state:
logger.warning(
f"Pipeline {pipeline_id} disappeared during run. Cannot update final state."
)
# The pipeline might have been deleted externally while running.
# Scheduler might need cleanup if the job still exists.
if self.scheduler_manager:
logger.warning(
f"Attempting to unschedule potentially orphaned job for {pipeline_id}"
)
asyncio.create_task(
self.scheduler_manager.unschedule_pipeline(pipeline_id)
)
return
# Avoid modifying the object fetched directly if store uses caching/references
final_pipeline_state = final_pipeline_state.model_copy(deep=True)
now = datetime.now(UTC)
final_pipeline_state.status = (
PipelineStatus.INACTIVE if run_successful else PipelineStatus.FAILED
)
if run_successful:
final_pipeline_state.config.last_run = (
now # Mark completion time on success
)
# Calculate and store the *next* run time based on the outcome
# Use the *updated* last_run if the run was successful
current_last_run = (
final_pipeline_state.config.last_run
) # This is 'now' if successful, else original last_run
final_pipeline_state.config.next_run = calculate_next_run(
frequency=final_pipeline_state.config.run_frequency,
last_run=current_last_run, # Use the relevant last_run for calculation
start_reference_time=now, # Use current time as reference for calculation
)
final_pipeline_state.updated_at = (
now # Update timestamp for this final save
)
await self.store.save(final_pipeline_state)
logger.info(
f"Pipeline {pipeline_id} run finished. Status: {final_pipeline_state.status}, Last Run: {final_pipeline_state.config.last_run}, Next Run: {final_pipeline_state.config.next_run}"
)
# Notify scheduler about the *new* next run time so it can reschedule accurately
if self.scheduler_manager:
logger.debug(
f"Notifying scheduler to reschedule pipeline {pipeline_id} after run completion with next run {final_pipeline_state.config.next_run}."
)
asyncio.create_task(
self.scheduler_manager.reschedule_pipeline(final_pipeline_state)
)
except Exception as e:
logger.error(
f"Failed to update pipeline {pipeline_id} state after run execution: {e}",
exc_info=True,
)
# The pipeline might be left in ACTIVE or an inconsistent state.
# Consider adding monitoring or retry logic here.
async def _execute_ingestion(self, config: IngestorInput):
"""
Executes the ingestion process for a pipeline using the provided IngestorInput config.
Returns the ingestion results or raises an exception on failure.
"""
# Ensure Ingestor is imported locally or globally if needed
# from ingestion.core import Ingestor # Example import if needed
# Check if Ingestor is already available (e.g., imported at module level)
# If not, uncomment the import above or ensure it's accessible.
# Assuming Ingestor is available in the scope:
try:
# Avoid circular import
from ingestion.core import Ingestor
logger.info(f"Executing ingestion with config: {config}")
# NOTE: Can be async
results = Ingestor.run(config.sources)
logger.info(f"Ingestion completed successfully. Results: {results}")
return results
except ImportError:
logger.error("Failed to import Ingestor. Cannot execute ingestion.")
raise RuntimeError("Ingestion module not found")
except Exception as e:
logger.error(f"Ingestion execution failed: {e}", exc_info=True)
raise