From f24886afdc215b7d56c61ce08db93083e82b50ed Mon Sep 17 00:00:00 2001 From: Sosokker Date: Mon, 12 May 2025 19:16:47 +0700 Subject: [PATCH] feat: add scheduler to help manage pipeline --- pipeline/pyproject.toml | 4 +- pipeline/scheduler/__init__.py | 0 pipeline/scheduler/jobs.py | 37 +++ pipeline/scheduler/manager.py | 314 +++++++++++++++++++++++ pipeline/scheduler/utils.py | 174 +++++++++++++ pipeline/services/pipeline_service.py | 342 ++++++++++++++++++-------- pipeline/uv.lock | 42 ++++ 7 files changed, 811 insertions(+), 102 deletions(-) create mode 100644 pipeline/scheduler/__init__.py create mode 100644 pipeline/scheduler/jobs.py create mode 100644 pipeline/scheduler/manager.py create mode 100644 pipeline/scheduler/utils.py diff --git a/pipeline/pyproject.toml b/pipeline/pyproject.toml index da36760..e67cc2e 100644 --- a/pipeline/pyproject.toml +++ b/pipeline/pyproject.toml @@ -5,11 +5,13 @@ description = "Add your description here" readme = "README.md" requires-python = ">=3.12" dependencies = [ + "apscheduler>=3.11.0", "crawl4ai>=0.5.0.post8", "fastapi[standard]>=0.115.12", "inquirer>=3.4.0", "loguru>=0.7.3", "pandas>=2.2.3", + "pydantic-settings>=2.9.1", "pytest>=8.3.5", "pytest-asyncio>=0.26.0", "python-dotenv>=1.1.0", @@ -19,4 +21,4 @@ dependencies = [ [tool.pytest.ini_options] asyncio_mode = "auto" -asyncio_default_fixture_loop_scope = "function" \ No newline at end of file +asyncio_default_fixture_loop_scope = "function" diff --git a/pipeline/scheduler/__init__.py b/pipeline/scheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipeline/scheduler/jobs.py b/pipeline/scheduler/jobs.py new file mode 100644 index 0000000..5f6c08b --- /dev/null +++ b/pipeline/scheduler/jobs.py @@ -0,0 +1,37 @@ +""" +Contains the main SchedulerManager class +""" + +# scheduler/jobs.py +from uuid import UUID +from loguru import logger + +# Avoid direct dependency on PipelineService here if possible. +# Instead, the manager will hold the service and pass necessary info. + + +async def execute_pipeline_job(pipeline_id: UUID, pipeline_service): + """ + Job function executed by APScheduler. Calls the PipelineService to run the pipeline. + + Args: + pipeline_id: The ID of the pipeline to run. + pipeline_service: The instance of PipelineService (passed via scheduler setup). + NOTE: Passing complex objects directly might have issues depending + on the job store/executor. Consider alternatives if problems arise. + """ + logger.info(f"Scheduler job started for pipeline_id: {pipeline_id}") + try: + # The run_pipeline method should handle its own internal state updates + # (like setting status to ACTIVE/INACTIVE and updating last_run) + await pipeline_service.run_pipeline(pipeline_id) + logger.info( + f"Scheduler job finished successfully for pipeline_id: {pipeline_id}" + ) + except Exception as e: + # The run_pipeline method should ideally handle its own errors, + # but we catch exceptions here as a fallback for logging. + logger.error( + f"Scheduler job failed for pipeline_id {pipeline_id}: {e}", exc_info=True + ) + # Consider adding retry logic here or within APScheduler config if needed diff --git a/pipeline/scheduler/manager.py b/pipeline/scheduler/manager.py new file mode 100644 index 0000000..41fc929 --- /dev/null +++ b/pipeline/scheduler/manager.py @@ -0,0 +1,314 @@ +""" +Manages pipeline job scheduling using APScheduler. +""" + +from datetime import datetime +from uuid import UUID +import asyncio + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.date import DateTrigger +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.executors.asyncio import AsyncIOExecutor +from apscheduler.job import Job +from apscheduler.jobstores.base import JobLookupError + +from loguru import logger + +from models.pipeline import Pipeline, PipelineStatus +from services.pipeline_service import PipelineService +from .jobs import execute_pipeline_job +from .utils import UTC + + +class SchedulerManager: + """Manages pipeline job scheduling using APScheduler.""" + + def __init__( + self, + pipeline_service: PipelineService, + check_interval_seconds: int = 60, + max_concurrent_runs: int = 5, + misfire_grace_sec: int = 300, + ): + self.pipeline_service = pipeline_service + self.check_interval_seconds = check_interval_seconds + self._scheduler: AsyncIOScheduler | None = None + self._running = False + self._discovery_job_id = "pipeline_discovery_job" + self.misfire_grace_sec = misfire_grace_sec + + # Configure APScheduler (same as before) + jobstores = {"default": MemoryJobStore()} + executors = {"default": AsyncIOExecutor()} + job_defaults = { + "coalesce": True, + "max_instances": 1, + "misfire_grace_time": self.misfire_grace_sec, + } + self._scheduler = AsyncIOScheduler( + jobstores=jobstores, + executors=executors, + job_defaults=job_defaults, + timezone=UTC, + ) + logger.info("APScheduler configured.") + # Link the scheduler back to the service *after* both are initialized + # This is often done in the main application setup + # self.pipeline_service.set_scheduler_manager(self) + + async def schedule_pipeline(self, pipeline: Pipeline): + """Adds or updates a job for a specific pipeline based on its next_run time.""" + if not self._running: + logger.warning( + f"Scheduler not running. Cannot schedule pipeline {pipeline.id}" + ) + return + + if not self._scheduler: + logger.error("Scheduler not initialized. Cannot schedule pipeline.") + return + + job_id = str(pipeline.id) + next_run_time = pipeline.config.next_run + + if not next_run_time: + logger.warning( + f"Pipeline {pipeline.id} has no next_run time. Cannot schedule." + ) + # Ensure any existing job is removed if next_run becomes None + await self.unschedule_pipeline(pipeline.id) + return + + # Ensure next_run_time is timezone-aware (UTC) + if next_run_time.tzinfo is None: + logger.warning( + f"Pipeline {pipeline.id} next_run time is naive. Assuming UTC." + ) + next_run_time = UTC.localize(next_run_time) + else: + next_run_time = next_run_time.astimezone(UTC) + + # Check if pipeline should be scheduled (e.g., is INACTIVE) + if pipeline.status != PipelineStatus.INACTIVE: + logger.info( + f"Pipeline {pipeline.id} is not INACTIVE (status: {pipeline.status}). Removing any existing schedule." + ) + await self.unschedule_pipeline(pipeline.id) + return + + try: + existing_job: Job | None = self._scheduler.get_job( + job_id, jobstore="default" + ) + trigger = DateTrigger(run_date=next_run_time) + + if existing_job: + # Job exists, check if trigger needs update + if existing_job.trigger != trigger: + logger.info( + f"Rescheduling pipeline {job_id} to run at {next_run_time}" + ) + self._scheduler.reschedule_job( + job_id, jobstore="default", trigger=trigger + ) + else: + logger.debug( + f"Pipeline {job_id} schedule already up-to-date for {next_run_time}." + ) + else: + # Add new job + logger.info( + f"Adding new schedule for pipeline {job_id} at {next_run_time}" + ) + self._scheduler.add_job( + execute_pipeline_job, + trigger=trigger, + args=[pipeline.id, self.pipeline_service], # Pass service instance + id=job_id, + name=f"Run Pipeline {pipeline.name} ({job_id})", + replace_existing=True, # Important to handle race conditions + jobstore="default", + ) + except Exception as e: + logger.error( + f"Failed to schedule/reschedule job for pipeline {job_id}: {e}", + exc_info=True, + ) + + async def reschedule_pipeline(self, pipeline: Pipeline): + """Alias for schedule_pipeline, as the logic is the same (add or update).""" + await self.schedule_pipeline(pipeline) + + async def unschedule_pipeline(self, pipeline_id: UUID): + """Removes the scheduled job for a specific pipeline.""" + if not self._running: + logger.warning( + f"Scheduler not running. Cannot unschedule pipeline {pipeline_id}" + ) + return + + if not self._scheduler: + logger.error("Scheduler not initialized. Cannot unschedule pipeline.") + return + + job_id = str(pipeline_id) + try: + existing_job = self._scheduler.get_job(job_id, jobstore="default") + if existing_job: + self._scheduler.remove_job(job_id, jobstore="default") + logger.info(f"Removed scheduled job for pipeline {job_id}") + else: + logger.debug(f"No scheduled job found to remove for pipeline {job_id}") + except JobLookupError: + logger.debug( + f"Job {job_id} not found during unschedule attempt (likely already removed)." + ) + except Exception as e: + logger.error( + f"Failed to remove job for pipeline {job_id}: {e}", exc_info=True + ) + + async def _discover_and_schedule_pipelines(self): + """ + Periodically checks all pipelines and ensures scheduler state matches. + Acts as a reconciliation loop. + """ + if not self._running: + return + + if not self._scheduler: + logger.error( + "Scheduler not initialized. Cannot perform discovery and reconciliation." + ) + return + + logger.debug("Running periodic pipeline discovery and reconciliation...") + try: + pipelines = await self.pipeline_service.list_pipelines() + scheduled_job_ids = { + job.id + for job in self._scheduler.get_jobs() + if job.id != self._discovery_job_id + } + active_pipeline_ids = set() + + # Ensure all active/schedulable pipelines have correct jobs + for pipeline in pipelines: + pipeline_id_str = str(pipeline.id) + active_pipeline_ids.add(pipeline_id_str) + # Use the central schedule_pipeline method for consistency + # This will handle adding, updating, or removing based on status and next_run + await self.schedule_pipeline(pipeline) + + # Clean up jobs for pipelines that no longer exist in the store + jobs_to_remove = scheduled_job_ids - active_pipeline_ids + for job_id_to_remove in jobs_to_remove: + logger.info( + f"Reconciliation: Removing job for deleted pipeline: {job_id_to_remove}" + ) + await self.unschedule_pipeline( + UUID(job_id_to_remove) + ) # Convert back to UUID + + logger.debug("Pipeline discovery and reconciliation finished.") + + except Exception as e: + logger.error( + f"Error during pipeline discovery/reconciliation: {e}", exc_info=True + ) + + # start() and stop() methods remain largely the same as before + def start(self): + """Starts the scheduler and the discovery job.""" + if not self._running and self._scheduler: + logger.info("Starting SchedulerManager...") + self._scheduler.start() + # Add the recurring reconciliation job + self._scheduler.add_job( + self._discover_and_schedule_pipelines, + trigger="interval", + seconds=self.check_interval_seconds, + id=self._discovery_job_id, + name="Reconcile Pipeline Schedules", + replace_existing=True, + jobstore="default", + misfire_grace_time=None, + ) + self._running = True + logger.info( + f"SchedulerManager started. Reconciliation interval: {self.check_interval_seconds}s" + ) + # Run discovery once immediately on start + logger.info("Performing initial pipeline schedule reconciliation...") + asyncio.create_task(self._discover_and_schedule_pipelines()) + elif self._running: + logger.warning("SchedulerManager is already running.") + else: + logger.error("Scheduler object not initialized. Cannot start.") + + def stop(self): + """Stops the scheduler gracefully.""" + if self._running and self._scheduler: + logger.info("Stopping SchedulerManager...") + try: + self._scheduler.remove_job(self._discovery_job_id, jobstore="default") + except JobLookupError: + logger.debug("Discovery job already removed or never added.") + except Exception as e: + logger.warning(f"Could not remove discovery job during shutdown: {e}") + + self._scheduler.shutdown() # Waits for running jobs + self._running = False + logger.info("SchedulerManager stopped.") + elif not self._scheduler: + logger.error("Scheduler object not initialized. Cannot stop.") + else: + logger.info("SchedulerManager is not running.") + + # trigger_manual_run remains the same conceptually + async def trigger_manual_run(self, pipeline_id: UUID): + """Manually triggers a pipeline run immediately via the scheduler.""" + if not self._running or not self._scheduler: + logger.error( + "Scheduler not running or not initialized. Cannot trigger manual run." + ) + return False + + logger.info(f"Manual run requested for pipeline {pipeline_id}") + # Use a unique ID to allow multiple manual runs without conflicting + job_id = f"manual_run_{pipeline_id}_{datetime.now(UTC).isoformat()}" + try: + pipeline = await self.pipeline_service.get_pipeline(pipeline_id) + if not pipeline: + logger.error( + f"Cannot trigger manual run: Pipeline {pipeline_id} not found." + ) + return False + # Ensure pipeline is not already running before adding manual job + if pipeline.status == PipelineStatus.ACTIVE: + logger.warning( + f"Cannot trigger manual run: Pipeline {pipeline_id} is already ACTIVE." + ) + return False + + self._scheduler.add_job( + execute_pipeline_job, + trigger=DateTrigger(run_date=datetime.now(UTC)), # Run ASAP + args=[pipeline.id, self.pipeline_service], + id=job_id, + name=f"Manual Run Pipeline {pipeline.name} ({pipeline.id})", + replace_existing=False, + jobstore="default", + misfire_grace_time=10, # Short grace time for manual runs + ) + logger.info( + f"Manual run job added for pipeline {pipeline.id} with job_id {job_id}" + ) + return True + except Exception as e: + logger.error( + f"Failed to add manual run job for pipeline {pipeline_id}: {e}", + exc_info=True, + ) + return False diff --git a/pipeline/scheduler/utils.py b/pipeline/scheduler/utils.py new file mode 100644 index 0000000..b129658 --- /dev/null +++ b/pipeline/scheduler/utils.py @@ -0,0 +1,174 @@ +""" +Helper for calculating next run times +""" + +from datetime import datetime, timedelta + +from loguru import logger +import pytz + +from models.pipeline import RunFrequency + +UTC = pytz.utc + + +def calculate_next_run( + frequency: RunFrequency, + last_run: datetime | None = None, + start_reference_time: datetime | None = None, +) -> datetime | None: + """ + Calculates the next scheduled run time based on frequency and last run. + + Args: + frequency: The desired run frequency (DAILY, WEEKLY, MONTHLY). + last_run: The timestamp of the last successful run (must be timezone-aware, preferably UTC). + start_reference_time: The time to calculate from if last_run is None (timezone-aware, UTC). + + Returns: + A timezone-aware datetime object (UTC) for the next run, or None if frequency is invalid. + """ + if start_reference_time is None: + start_reference_time = datetime.now(UTC) + elif start_reference_time.tzinfo is None: + logger.warning( + "calculate_next_run received naive start_reference_time, assuming UTC." + ) + start_reference_time = UTC.localize(start_reference_time) + else: + start_reference_time = start_reference_time.astimezone(UTC) + + # Ensure last_run is timezone-aware (UTC) if provided + if last_run: + if last_run.tzinfo is None: + logger.warning( + f"calculate_next_run received naive last_run ({last_run}), assuming UTC." + ) + base_time = UTC.localize(last_run) + else: + base_time = last_run.astimezone(UTC) + else: + # If never run, calculate the *first* run time relative to now + base_time = start_reference_time + + try: + next_run_time: datetime | None = None + if frequency == RunFrequency.DAILY: + # If last run was today (UTC), schedule for tomorrow. Otherwise, schedule for today (or next occurrence). + target_date = base_time.date() + if ( + last_run + and last_run.astimezone(UTC).date() >= start_reference_time.date() + ): + target_date += timedelta(days=1) + # Schedule for midnight UTC of the target date + next_run_time = datetime( + target_date.year, + target_date.month, + target_date.day, + 0, + 0, + 0, + tzinfo=UTC, + ) + + elif frequency == RunFrequency.WEEKLY: + # Schedule for start of the next week (e.g., Monday 00:00 UTC) + days_until_next_monday = (7 - base_time.weekday()) % 7 + # If today is Monday and we haven't run yet this week OR last run was before this Monday + run_this_week = True + if last_run: + last_run_monday = last_run.astimezone(UTC) - timedelta( + days=last_run.weekday() + ) + this_monday = start_reference_time - timedelta( + days=start_reference_time.weekday() + ) + if last_run_monday.date() >= this_monday.date(): + run_this_week = False + + if ( + days_until_next_monday == 0 and not run_this_week + ): # It's Monday, but we ran >= this Monday + days_until_next_monday = 7 # Schedule for next week + + target_date = (base_time + timedelta(days=days_until_next_monday)).date() + next_run_time = datetime( + target_date.year, + target_date.month, + target_date.day, + 0, + 0, + 0, + tzinfo=UTC, + ) + + elif frequency == RunFrequency.MONTHLY: + # Schedule for start of the next month (1st day, 00:00 UTC) + current_year = base_time.year + current_month = base_time.month + run_this_month = True + + if last_run: + last_run_start_of_month = last_run.astimezone(UTC).replace( + day=1, hour=0, minute=0, second=0, microsecond=0 + ) + this_start_of_month = start_reference_time.replace( + day=1, hour=0, minute=0, second=0, microsecond=0 + ) + if last_run_start_of_month.date() >= this_start_of_month.date(): + run_this_month = False + + if run_this_month: + # Schedule for the 1st of the *current* month if not already past/run + target_date = base_time.replace(day=1).date() + target_dt = datetime( + target_date.year, + target_date.month, + target_date.day, + 0, + 0, + 0, + tzinfo=UTC, + ) + # If the 1st of this month is in the future, or it's today and we haven't run this month + if target_dt >= start_reference_time: + next_run_time = target_dt + else: # The 1st has passed this month, schedule for next month + run_this_month = False # Force calculation for next month + + if not run_this_month: + # Calculate 1st of next month + next_month = current_month + 1 + next_year = current_year + if next_month > 12: + next_month = 1 + next_year += 1 + target_date = datetime(next_year, next_month, 1).date() + next_run_time = datetime( + target_date.year, + target_date.month, + target_date.day, + 0, + 0, + 0, + tzinfo=UTC, + ) + + # Ensure calculated time is in the future relative to 'now' if last_run wasn't provided + if last_run is None and next_run_time and next_run_time <= start_reference_time: + # If calculated time is in the past based on 'now', recalculate as if last run just happened + logger.debug( + f"Initial calculated next_run {next_run_time} is in the past/present for new schedule. Recalculating." + ) + return calculate_next_run( + frequency, start_reference_time, start_reference_time + ) + + return next_run_time + + except Exception as e: + logger.error( + f"Error calculating next run for frequency {frequency}, last_run {last_run}: {e}" + ) + return None diff --git a/pipeline/services/pipeline_service.py b/pipeline/services/pipeline_service.py index 0f3416f..a07409a 100644 --- a/pipeline/services/pipeline_service.py +++ b/pipeline/services/pipeline_service.py @@ -1,5 +1,11 @@ -from datetime import datetime, timezone +""" +Pipeline service to help do pipeline CRUD +""" + +import asyncio +from datetime import datetime from uuid import UUID, uuid4 +from typing import Optional, List, TYPE_CHECKING from loguru import logger from models.pipeline import ( @@ -11,12 +17,42 @@ from models.pipeline import ( ) from models.ingestion import IngestorInput from stores.base import PipelineStore +from scheduler.utils import calculate_next_run, UTC # Import the utility and UTC + +# !use TYPE_CHECKING to avoid circular imports at runtime +# the SchedulerManager needs PipelineService, and PipelineService now needs SchedulerManager +if TYPE_CHECKING: + from scheduler.manager import SchedulerManager class PipelineService: - def __init__(self, store: PipelineStore): + """ + Pipeline service to help do pipeline CRUD + """ + + def __init__( + self, + store: PipelineStore, + scheduler_manager: Optional["SchedulerManager"] = None, + ): self.store = store + self.scheduler_manager: Optional["SchedulerManager"] = ( + scheduler_manager # Store the scheduler instance + ) logger.info(f"PipelineService initialized with store: {type(store).__name__}") + if scheduler_manager: + logger.info("PipelineService configured with SchedulerManager.") + else: + logger.warning( + "PipelineService initialized without SchedulerManager. Scheduling notifications disabled." + ) + + def set_scheduler_manager(self, scheduler_manager: "SchedulerManager"): + """ + Method to link the scheduler later if needed (e.g., after both are created) + """ + self.scheduler_manager = scheduler_manager + logger.info("SchedulerManager linked to PipelineService.") async def create_pipeline( self, @@ -25,13 +61,21 @@ class PipelineService: ingestor_config: IngestorInput, run_frequency: RunFrequency, ) -> Pipeline: - """Create a new pipeline using the store""" + """Create a new pipeline, save it, and notify the scheduler.""" logger.info( - f"Creating pipeline: name={name}, description={description}, ingestor_config=..., run_frequency={run_frequency}" + f"Creating pipeline: name={name}, description={description}, run_frequency={run_frequency}" ) try: pipeline_id = uuid4() - now = datetime.now(timezone.utc) + now = datetime.now(UTC) # Use UTC consistently + + # Calculate the initial next_run time + initial_next_run = calculate_next_run( + frequency=run_frequency, + last_run=None, # No last run yet + start_reference_time=now, + ) + pipeline = Pipeline( id=pipeline_id, name=name, @@ -39,147 +83,243 @@ class PipelineService: config=PipelineConfig( ingestor_config=ingestor_config, run_frequency=run_frequency, - # will be set by scheduler last_run=None, - next_run=None, + next_run=initial_next_run, # Store the calculated next run ), - status=PipelineStatus.INACTIVE, + status=PipelineStatus.INACTIVE, # Start as inactive created_at=now, updated_at=now, ) await self.store.save(pipeline) - logger.info(f"Pipeline created and saved: id={pipeline.id}") + logger.info( + f"Pipeline created and saved: id={pipeline.id}, next_run={initial_next_run}" + ) + + # Notify the scheduler to add the job immediately + if self.scheduler_manager: + logger.debug(f"Notifying scheduler to schedule pipeline {pipeline.id}") + # Use asyncio.create_task for fire-and-forget notification + asyncio.create_task(self.scheduler_manager.schedule_pipeline(pipeline)) + else: + logger.warning( + f"Scheduler not available, cannot schedule pipeline {pipeline.id} immediately." + ) + return pipeline except Exception as e: - logger.error(f"Failed to create pipeline: {e}") - raise e + logger.error(f"Failed to create pipeline: {e}", exc_info=True) + raise # Re-raise the exception after logging async def update_pipeline( self, pipeline_id: UUID, pipeline_in: PipelineCreate - ) -> Pipeline | None: - """Update an existing pipeline using the store""" - logger.info(f"Updating pipeline: id={pipeline_id}, update_data=...") + ) -> Optional[Pipeline]: + """Update an existing pipeline, save it, and notify the scheduler.""" + logger.info(f"Updating pipeline: id={pipeline_id}") existing_pipeline = await self.store.get(pipeline_id) if not existing_pipeline: logger.warning(f"Pipeline not found for update: id={pipeline_id}") return None try: - # TODO: review code here - # Create an updated pipeline object based on existing one and input - # Be careful about mutable defaults if not using Pydantic properly - update_data = pipeline_in.model_dump( - exclude_unset=True - ) # Get only provided fields + update_data = pipeline_in.model_dump(exclude_unset=True) + # Use model_copy for a cleaner update merge + updated_pipeline = existing_pipeline.model_copy( + deep=True, update=update_data + ) - # Merge update_data into a dict representation of the existing pipeline - # Note: This simplistic merge might need refinement based on how partial updates should behave, - # especially for nested structures like 'config'. Pydantic's model_update might be useful here if applicable. - updated_pipeline_data = existing_pipeline.model_dump() + # Check if frequency changed, if so, recalculate next_run + config_changed = "config" in update_data + frequency_changed = False + if ( + config_changed + and updated_pipeline.config.run_frequency + != existing_pipeline.config.run_frequency + ): + frequency_changed = True + logger.info( + f"Run frequency changed for pipeline {pipeline_id}. Recalculating next run." + ) + now = datetime.now(UTC) + updated_pipeline.config.next_run = calculate_next_run( + frequency=updated_pipeline.config.run_frequency, + last_run=existing_pipeline.config.last_run, # Base on last run + start_reference_time=now, + ) + logger.info( + f"Recalculated next_run for {pipeline_id}: {updated_pipeline.config.next_run}" + ) - if "name" in update_data: - updated_pipeline_data["name"] = update_data["name"] - if "description" in update_data: - updated_pipeline_data["description"] = update_data["description"] - - # Merge config - replace the whole config if provided in input - if "config" in update_data: - # Ensure the input config is a PipelineConfig object before assignment - if pipeline_in.config: - updated_pipeline_data["config"] = pipeline_in.config.model_dump() - else: - # Handle cases where config might be passed differently if needed - logger.error("Invalid config format for update") - raise ValueError("Invalid config format for update") - - # Re-validate the merged data into a Pipeline object - # Preserve original ID and created_at - updated_pipeline = Pipeline.model_validate(updated_pipeline_data) - # updated_at is handled by the store's save method - - await self.store.save(updated_pipeline) # Save the updated object + # Save the updated pipeline (store's save method handles updated_at) + await self.store.save(updated_pipeline) logger.info(f"Pipeline updated: id={updated_pipeline.id}") + + # Notify the scheduler if relevant config changed + # We notify on any config change or if frequency specifically changed + if self.scheduler_manager and (config_changed or frequency_changed): + logger.debug( + f"Notifying scheduler to reschedule pipeline {updated_pipeline.id}" + ) + asyncio.create_task( + self.scheduler_manager.reschedule_pipeline(updated_pipeline) + ) + elif self.scheduler_manager: + logger.debug( + f"Pipeline {updated_pipeline.id} updated, but no schedule change needed." + ) + return updated_pipeline except Exception as e: - logger.error(f"Failed to update pipeline id={pipeline_id}: {e}") - raise e + logger.error( + f"Failed to update pipeline id={pipeline_id}: {e}", exc_info=True + ) + raise async def delete_pipeline(self, pipeline_id: UUID) -> bool: - """Delete an existing pipeline using the store""" - logger.info(f"Deleting pipeline: id={pipeline_id}") + """Delete an existing pipeline and notify the scheduler.""" + logger.info(f"Attempting to delete pipeline: id={pipeline_id}") + + pipeline_exists = await self.store.get(pipeline_id) is not None + if not pipeline_exists: + logger.warning(f"Pipeline {pipeline_id} not found for deletion.") + return False + + # Notify scheduler *before* deleting from store, in case deletion fails + if self.scheduler_manager: + logger.debug(f"Notifying scheduler to unschedule pipeline {pipeline_id}") + # We need to wait for this to ensure the job is removed before DB record gone + await self.scheduler_manager.unschedule_pipeline(pipeline_id) + else: + logger.warning( + f"Scheduler not available, cannot unschedule pipeline {pipeline_id}." + ) + + # Proceed with deletion from store deleted = await self.store.delete(pipeline_id) - if not deleted: - logger.warning(f"Pipeline not found for deletion: id={pipeline_id}") + if deleted: + logger.info(f"Pipeline deleted successfully from store: id={pipeline_id}") + else: + # This might happen if pipeline was already gone, or store error + logger.warning( + f"Pipeline {pipeline_id} not found in store for deletion, or delete failed." + ) + # Scheduler job should have been removed anyway if it existed. return deleted - async def get_pipeline(self, pipeline_id: UUID) -> Pipeline | None: - """Get a single pipeline by ID""" - logger.info(f"Getting pipeline: id={pipeline_id}") + async def get_pipeline(self, pipeline_id: UUID) -> Optional[Pipeline]: + """Get a single pipeline by ID.""" + logger.debug(f"Getting pipeline: id={pipeline_id}") return await self.store.get(pipeline_id) - async def list_pipelines(self) -> list[Pipeline]: - """Get all pipelines""" - logger.info("Listing all pipelines") + async def list_pipelines(self) -> List[Pipeline]: + """Get all pipelines.""" + logger.debug("Listing all pipelines") return await self.store.get_all() async def run_pipeline(self, pipeline_id: UUID) -> None: - """Run an existing pipeline""" + """ + Executes the pipeline logic, updating status and run times. + This is called by the scheduler job or manual trigger. + """ + logger.info(f"Attempting run execution for pipeline: id={pipeline_id}") pipeline = await self.store.get(pipeline_id) + if not pipeline: logger.error(f"Cannot run pipeline: Pipeline not found (id={pipeline_id})") return + # Simple lock mechanism using status if pipeline.status == PipelineStatus.ACTIVE: logger.warning( - f"Pipeline id={pipeline_id} is already active/running (status={pipeline.status}). Skipping run." + f"Pipeline id={pipeline_id} is already ACTIVE. Skipping run." ) return - logger.info( - f"Attempting to run pipeline: id={pipeline_id}, name='{pipeline.name}'" - ) - + # --- Mark as ACTIVE --- try: - # 2. Update status to ACTIVE (optional, depends on desired state mgmt) - # pipeline.status = PipelineStatus.ACTIVE - # pipeline.config.last_run = datetime.now(timezone.utc) # Mark start time - # await self.store.save(pipeline) + pipeline.status = PipelineStatus.ACTIVE + # Optionally mark start time here if needed, but last_run usually marks completion + # pipeline.config.last_run = datetime.now(UTC) + await self.store.save(pipeline) + logger.info(f"Pipeline {pipeline_id} marked as ACTIVE.") + except Exception as e: + logger.error( + f"Failed to mark pipeline {pipeline_id} as ACTIVE: {e}. Aborting run.", + exc_info=True, + ) + # Restore original status if possible? Depends on store implementation. + return # Abort run if we can't even update status - # 3. Execute the actual pipeline logic (ingestion, processing, etc.) - # This part depends heavily on your pipeline runner implementation - # It would use pipeline.config.ingestor_config, etc. - logger.info(f"Executing pipeline logic for id={pipeline_id}...") - # ... - # result = await actual_pipeline_runner(pipeline.config) - # ... - logger.info(f"Pipeline logic finished for id={pipeline_id}.") + # --- Execute Pipeline Logic --- + run_successful = False + try: + logger.info(f"Executing core logic for pipeline id={pipeline_id}...") + # --------------------------------------------------- + # TODO: replace with actual pipeline execution call + # Example: await self._execute_ingestion(pipeline.config.ingestor_config) + # Example: await self._process_data(...) + await asyncio.sleep(5) # Simulate work + logger.info(f"Core logic finished successfully for id={pipeline_id}.") + # --------------------------------------------------- + run_successful = True - # 4. Update status (e.g., back to INACTIVE) and run times on completion - # Fetch the latest state in case it changed during run - current_pipeline_state = await self.store.get(pipeline_id) - if current_pipeline_state: - current_pipeline_state.status = ( - PipelineStatus.INACTIVE - ) # Or COMPLETED if you have that state - current_pipeline_state.config.last_run = datetime.now( - timezone.utc - ) # Mark end time - # TODO: Calculate next_run based on current_pipeline_state.config.run_frequency - # current_pipeline_state.config.next_run = calculate_next_run(...) - await self.store.save(current_pipeline_state) - logger.info( - f"Pipeline run finished and state updated for id={pipeline_id}" - ) - else: + except Exception as e: + logger.error( + f"Core logic failed during pipeline run id={pipeline_id}: {e}", + exc_info=True, + ) + # run_successful remains False + + # --- Update Final State --- + try: + # Fetch the latest state again in case of external changes (though unlikely with ACTIVE status lock) + final_pipeline_state = await self.store.get(pipeline_id) + if not final_pipeline_state: logger.warning( - f"Pipeline id={pipeline_id} disappeared during run. Cannot update final state." + f"Pipeline {pipeline_id} disappeared during run. Cannot update final state." + ) + return + + now = datetime.now(UTC) + final_pipeline_state.status = PipelineStatus.INACTIVE # Reset status + # TODO: Add a FAILED status? + # final_pipeline_state.status = PipelineStatus.INACTIVE if run_successful else PipelineStatus.FAILED + + if run_successful: + final_pipeline_state.config.last_run = ( + now # Mark completion time on success + ) + + # Calculate and store the *next* run time after this one + final_pipeline_state.config.next_run = calculate_next_run( + frequency=final_pipeline_state.config.run_frequency, + last_run=final_pipeline_state.config.last_run, # Use the updated last_run + start_reference_time=now, + ) + + await self.store.save(final_pipeline_state) + logger.info( + f"Pipeline {pipeline_id} run finished. Status: {final_pipeline_state.status}, Last Run: {final_pipeline_state.config.last_run}, Next Run: {final_pipeline_state.config.next_run}" + ) + + # Notify scheduler about the *new* next run time + if self.scheduler_manager: + logger.debug( + f"Notifying scheduler to reschedule pipeline {pipeline_id} after run completion." + ) + asyncio.create_task( + self.scheduler_manager.reschedule_pipeline(final_pipeline_state) ) except Exception as e: - logger.error(f"Failed during pipeline run id={pipeline_id}: {e}") - # Optionally update status to FAILED - current_pipeline_state = await self.store.get(pipeline_id) - if current_pipeline_state: - # current_pipeline_state.status = PipelineStatus.FAILED # Add a FAILED state if desired - await self.store.save(current_pipeline_state) - # Handle/log the error appropriately - # raise # Optionally re-raise + logger.error( + f"Failed to update pipeline {pipeline_id} state after run execution: {e}", + exc_info=True, + ) + # The pipeline might be left in ACTIVE state if this fails. Requires manual intervention or recovery logic. + + # TODO: Complete this method + # --- Placeholder for actual execution --- + async def _execute_ingestion(self, config: IngestorInput): + # Replace with your actual ingestion logic + logger.info(f"Simulating ingestion with config: {config}") + await asyncio.sleep(2) # Simulate I/O + logger.info("Ingestion simulation complete.") diff --git a/pipeline/uv.lock b/pipeline/uv.lock index 89a02c1..f30f602 100644 --- a/pipeline/uv.lock +++ b/pipeline/uv.lock @@ -124,6 +124,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a1/ee/48ca1a7c89ffec8b6a0c5d02b89c305671d5ffd8d3c94acf8b8c408575bb/anyio-4.9.0-py3-none-any.whl", hash = "sha256:9f76d541cad6e36af7beb62e978876f3b41e3e04f2c1fbf0884604c0a9c4d93c", size = 100916 }, ] +[[package]] +name = "apscheduler" +version = "3.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzlocal" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4e/00/6d6814ddc19be2df62c8c898c4df6b5b1914f3bd024b780028caa392d186/apscheduler-3.11.0.tar.gz", hash = "sha256:4c622d250b0955a65d5d0eb91c33e6d43fd879834bf541e0a18661ae60460133", size = 107347 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/ae/9a053dd9229c0fde6b1f1f33f609ccff1ee79ddda364c756a924c6d8563b/APScheduler-3.11.0-py3-none-any.whl", hash = "sha256:fc134ca32e50f5eadcc4938e3a4545ab19131435e851abb40b34d63d5141c6da", size = 64004 }, +] + [[package]] name = "attrs" version = "25.3.0" @@ -302,11 +314,13 @@ name = "crawler-ai" version = "0.1.0" source = { virtual = "." } dependencies = [ + { name = "apscheduler" }, { name = "crawl4ai" }, { name = "fastapi", extra = ["standard"] }, { name = "inquirer" }, { name = "loguru" }, { name = "pandas" }, + { name = "pydantic-settings" }, { name = "pytest" }, { name = "pytest-asyncio" }, { name = "python-dotenv" }, @@ -316,11 +330,13 @@ dependencies = [ [package.metadata] requires-dist = [ + { name = "apscheduler", specifier = ">=3.11.0" }, { name = "crawl4ai", specifier = ">=0.5.0.post8" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.115.12" }, { name = "inquirer", specifier = ">=3.4.0" }, { name = "loguru", specifier = ">=0.7.3" }, { name = "pandas", specifier = ">=2.2.3" }, + { name = "pydantic-settings", specifier = ">=2.9.1" }, { name = "pytest", specifier = ">=8.3.5" }, { name = "pytest-asyncio", specifier = ">=0.26.0" }, { name = "python-dotenv", specifier = ">=1.1.0" }, @@ -1314,6 +1330,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/71/ae/fe31e7f4a62431222d8f65a3bd02e3fa7e6026d154a00818e6d30520ea77/pydantic_core-2.33.1-cp313-cp313t-win_amd64.whl", hash = "sha256:338ea9b73e6e109f15ab439e62cb3b78aa752c7fd9536794112e14bee02c8d18", size = 1931810 }, ] +[[package]] +name = "pydantic-settings" +version = "2.9.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pydantic" }, + { name = "python-dotenv" }, + { name = "typing-inspection" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/67/1d/42628a2c33e93f8e9acbde0d5d735fa0850f3e6a2f8cb1eb6c40b9a732ac/pydantic_settings-2.9.1.tar.gz", hash = "sha256:c509bf79d27563add44e8446233359004ed85066cd096d8b510f715e6ef5d268", size = 163234 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b6/5f/d6d641b490fd3ec2c4c13b4244d68deea3a1b970a97be64f34fb5504ff72/pydantic_settings-2.9.1-py3-none-any.whl", hash = "sha256:59b4f431b1defb26fe620c71a7d3968a710d719f5f4cdbbdb7926edeb770f6ef", size = 44356 }, +] + [[package]] name = "pyee" version = "12.1.1" @@ -1810,6 +1840,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839 }, ] +[[package]] +name = "tzlocal" +version = "5.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzdata", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8b/2e/c14812d3d4d9cd1773c6be938f89e5735a1f11a9f184ac3639b93cef35d5/tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd", size = 30761 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c2/14/e2a54fabd4f08cd7af1c07030603c3356b74da07f7cc056e600436edfa17/tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d", size = 18026 }, +] + [[package]] name = "urllib3" version = "2.4.0"