From 5cd5fcfeb3da4035016ca7e91757c3bfb463e5cf Mon Sep 17 00:00:00 2001 From: Sosokker Date: Mon, 12 May 2025 21:12:50 +0700 Subject: [PATCH] fix: add pipeline test, add FAILED status and fix pydantic scheme error --- pipeline/models/pipeline.py | 1 + pipeline/pyproject.toml | 4 + pipeline/services/pipeline_service.py | 168 ++++--- pipeline/tests/test_pipeline_service.py | 571 ++++++++++++++++++++++++ pipeline/uv.lock | 28 ++ 5 files changed, 718 insertions(+), 54 deletions(-) create mode 100644 pipeline/tests/test_pipeline_service.py diff --git a/pipeline/models/pipeline.py b/pipeline/models/pipeline.py index 1b22618..d10709e 100644 --- a/pipeline/models/pipeline.py +++ b/pipeline/models/pipeline.py @@ -9,6 +9,7 @@ from models.ingestion import IngestorInput class PipelineStatus(str, enum.Enum): ACTIVE = "active" INACTIVE = "inactive" + FAILED = "failed" class RunFrequency(str, enum.Enum): diff --git a/pipeline/pyproject.toml b/pipeline/pyproject.toml index e67cc2e..2da7861 100644 --- a/pipeline/pyproject.toml +++ b/pipeline/pyproject.toml @@ -8,12 +8,14 @@ dependencies = [ "apscheduler>=3.11.0", "crawl4ai>=0.5.0.post8", "fastapi[standard]>=0.115.12", + "freezegun>=1.5.1", "inquirer>=3.4.0", "loguru>=0.7.3", "pandas>=2.2.3", "pydantic-settings>=2.9.1", "pytest>=8.3.5", "pytest-asyncio>=0.26.0", + "pytest-mock>=3.14.0", "python-dotenv>=1.1.0", "responses>=0.25.7", "rich>=14.0.0", @@ -22,3 +24,5 @@ dependencies = [ [tool.pytest.ini_options] asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" +log_cli = true +log_cli_level = "INFO" \ No newline at end of file diff --git a/pipeline/services/pipeline_service.py b/pipeline/services/pipeline_service.py index a07409a..1e07eae 100644 --- a/pipeline/services/pipeline_service.py +++ b/pipeline/services/pipeline_service.py @@ -17,7 +17,7 @@ from models.pipeline import ( ) from models.ingestion import IngestorInput from stores.base import PipelineStore -from scheduler.utils import calculate_next_run, UTC # Import the utility and UTC +from scheduler.utils import calculate_next_run, UTC # !use TYPE_CHECKING to avoid circular imports at runtime # the SchedulerManager needs PipelineService, and PipelineService now needs SchedulerManager @@ -67,12 +67,12 @@ class PipelineService: ) try: pipeline_id = uuid4() - now = datetime.now(UTC) # Use UTC consistently + now = datetime.now(UTC) # Calculate the initial next_run time initial_next_run = calculate_next_run( frequency=run_frequency, - last_run=None, # No last run yet + last_run=None, start_reference_time=now, ) @@ -84,9 +84,9 @@ class PipelineService: ingestor_config=ingestor_config, run_frequency=run_frequency, last_run=None, - next_run=initial_next_run, # Store the calculated next run + next_run=initial_next_run, ), - status=PipelineStatus.INACTIVE, # Start as inactive + status=PipelineStatus.INACTIVE, created_at=now, updated_at=now, ) @@ -121,50 +121,72 @@ class PipelineService: return None try: - update_data = pipeline_in.model_dump(exclude_unset=True) - # Use model_copy for a cleaner update merge - updated_pipeline = existing_pipeline.model_copy( - deep=True, update=update_data - ) + # 1. Create a deep copy to modify + updated_pipeline = existing_pipeline.model_copy(deep=True) - # Check if frequency changed, if so, recalculate next_run - config_changed = "config" in update_data + # 2. Update top-level fields directly from the input model + updated_pipeline.name = pipeline_in.name + updated_pipeline.description = pipeline_in.description + + # 3. Handle config update carefully + config_changed = False frequency_changed = False - if ( - config_changed - and updated_pipeline.config.run_frequency - != existing_pipeline.config.run_frequency - ): - frequency_changed = True + original_frequency = ( + updated_pipeline.config.run_frequency + ) # Store before potential change + + # Check if the input payload actually provided config data + if pipeline_in.config: + config_changed = True + # Update the fields *within* the existing config object + # Ensure the nested ingestor_config is also handled correctly (assuming assignment works or potentially use model_copy/re-init if complex) + updated_pipeline.config.ingestor_config = ( + pipeline_in.config.ingestor_config.model_copy(deep=True) + ) # Use model_copy for safety + updated_pipeline.config.run_frequency = pipeline_in.config.run_frequency + + # Check if the frequency actually changed after the update + if updated_pipeline.config.run_frequency != original_frequency: + frequency_changed = True + + # 4. Recalculate next_run ONLY if frequency changed + if frequency_changed: logger.info( - f"Run frequency changed for pipeline {pipeline_id}. Recalculating next run." + f"Run frequency changed for pipeline {pipeline_id} from {original_frequency} to {updated_pipeline.config.run_frequency}. Recalculating next run." ) now = datetime.now(UTC) + # Use the existing last_run from the copied object updated_pipeline.config.next_run = calculate_next_run( frequency=updated_pipeline.config.run_frequency, - last_run=existing_pipeline.config.last_run, # Base on last run + last_run=updated_pipeline.config.last_run, start_reference_time=now, ) logger.info( f"Recalculated next_run for {pipeline_id}: {updated_pipeline.config.next_run}" ) - # Save the updated pipeline (store's save method handles updated_at) - await self.store.save(updated_pipeline) - logger.info(f"Pipeline updated: id={updated_pipeline.id}") + # 5. Update the timestamp before saving + updated_pipeline.updated_at = datetime.now(UTC) - # Notify the scheduler if relevant config changed - # We notify on any config change or if frequency specifically changed - if self.scheduler_manager and (config_changed or frequency_changed): + # 6. Save the updated pipeline + await self.store.save(updated_pipeline) + logger.info(f"Pipeline updated successfully: id={updated_pipeline.id}") + + # 7. Notify the scheduler if config changed (including frequency) + # Scheduler needs the *final* state of the updated pipeline for rescheduling. + if self.scheduler_manager and config_changed: logger.debug( - f"Notifying scheduler to reschedule pipeline {updated_pipeline.id}" + f"Notifying scheduler to reschedule pipeline {updated_pipeline.id} due to config change." ) + # Pass the fully updated pipeline object asyncio.create_task( self.scheduler_manager.reschedule_pipeline(updated_pipeline) ) elif self.scheduler_manager: logger.debug( - f"Pipeline {updated_pipeline.id} updated, but no schedule change needed." + f"Pipeline {updated_pipeline.id} updated (non-config fields), no reschedule needed based on config." + # NOTE: might still want to reschedule if other non-config updates could affect execution, + # but based on current logic, only config changes trigger rescheduling. ) return updated_pipeline @@ -226,7 +248,7 @@ class PipelineService: if not pipeline: logger.error(f"Cannot run pipeline: Pipeline not found (id={pipeline_id})") return - # Simple lock mechanism using status + # NOTE: lock mechanism if pipeline.status == PipelineStatus.ACTIVE: logger.warning( f"Pipeline id={pipeline_id} is already ACTIVE. Skipping run." @@ -236,8 +258,7 @@ class PipelineService: # --- Mark as ACTIVE --- try: pipeline.status = PipelineStatus.ACTIVE - # Optionally mark start time here if needed, but last_run usually marks completion - # pipeline.config.last_run = datetime.now(UTC) + pipeline.updated_at = datetime.now(UTC) # Update timestamp await self.store.save(pipeline) logger.info(f"Pipeline {pipeline_id} marked as ACTIVE.") except Exception as e: @@ -245,20 +266,19 @@ class PipelineService: f"Failed to mark pipeline {pipeline_id} as ACTIVE: {e}. Aborting run.", exc_info=True, ) - # Restore original status if possible? Depends on store implementation. - return # Abort run if we can't even update status + # Attempt to restore status? Depends on store guarantees. + # pipeline.status = original_status # Potentially try rollback + return # Abort run # --- Execute Pipeline Logic --- run_successful = False try: logger.info(f"Executing core logic for pipeline id={pipeline_id}...") # --------------------------------------------------- - # TODO: replace with actual pipeline execution call - # Example: await self._execute_ingestion(pipeline.config.ingestor_config) - # Example: await self._process_data(...) - await asyncio.sleep(5) # Simulate work - logger.info(f"Core logic finished successfully for id={pipeline_id}.") + # Ensure _execute_ingestion is awaited if it's async + await self._execute_ingestion(pipeline.config.ingestor_config) # --------------------------------------------------- + logger.info(f"Core logic finished successfully for id={pipeline_id}.") run_successful = True except Exception as e: @@ -270,29 +290,49 @@ class PipelineService: # --- Update Final State --- try: - # Fetch the latest state again in case of external changes (though unlikely with ACTIVE status lock) + # Fetch the latest state again to minimize race conditions, though the ACTIVE lock helps final_pipeline_state = await self.store.get(pipeline_id) if not final_pipeline_state: logger.warning( f"Pipeline {pipeline_id} disappeared during run. Cannot update final state." ) + # The pipeline might have been deleted externally while running. + # Scheduler might need cleanup if the job still exists. + if self.scheduler_manager: + logger.warning( + f"Attempting to unschedule potentially orphaned job for {pipeline_id}" + ) + asyncio.create_task( + self.scheduler_manager.unschedule_pipeline(pipeline_id) + ) return + # Avoid modifying the object fetched directly if store uses caching/references + final_pipeline_state = final_pipeline_state.model_copy(deep=True) + now = datetime.now(UTC) - final_pipeline_state.status = PipelineStatus.INACTIVE # Reset status - # TODO: Add a FAILED status? - # final_pipeline_state.status = PipelineStatus.INACTIVE if run_successful else PipelineStatus.FAILED + final_pipeline_state.status = ( + PipelineStatus.INACTIVE if run_successful else PipelineStatus.FAILED + ) if run_successful: final_pipeline_state.config.last_run = ( now # Mark completion time on success ) - # Calculate and store the *next* run time after this one + # Calculate and store the *next* run time based on the outcome + # Use the *updated* last_run if the run was successful + current_last_run = ( + final_pipeline_state.config.last_run + ) # This is 'now' if successful, else original last_run final_pipeline_state.config.next_run = calculate_next_run( frequency=final_pipeline_state.config.run_frequency, - last_run=final_pipeline_state.config.last_run, # Use the updated last_run - start_reference_time=now, + last_run=current_last_run, # Use the relevant last_run for calculation + start_reference_time=now, # Use current time as reference for calculation + ) + + final_pipeline_state.updated_at = ( + now # Update timestamp for this final save ) await self.store.save(final_pipeline_state) @@ -300,10 +340,10 @@ class PipelineService: f"Pipeline {pipeline_id} run finished. Status: {final_pipeline_state.status}, Last Run: {final_pipeline_state.config.last_run}, Next Run: {final_pipeline_state.config.next_run}" ) - # Notify scheduler about the *new* next run time + # Notify scheduler about the *new* next run time so it can reschedule accurately if self.scheduler_manager: logger.debug( - f"Notifying scheduler to reschedule pipeline {pipeline_id} after run completion." + f"Notifying scheduler to reschedule pipeline {pipeline_id} after run completion with next run {final_pipeline_state.config.next_run}." ) asyncio.create_task( self.scheduler_manager.reschedule_pipeline(final_pipeline_state) @@ -314,12 +354,32 @@ class PipelineService: f"Failed to update pipeline {pipeline_id} state after run execution: {e}", exc_info=True, ) - # The pipeline might be left in ACTIVE state if this fails. Requires manual intervention or recovery logic. + # The pipeline might be left in ACTIVE or an inconsistent state. + # Consider adding monitoring or retry logic here. - # TODO: Complete this method - # --- Placeholder for actual execution --- async def _execute_ingestion(self, config: IngestorInput): - # Replace with your actual ingestion logic - logger.info(f"Simulating ingestion with config: {config}") - await asyncio.sleep(2) # Simulate I/O - logger.info("Ingestion simulation complete.") + """ + Executes the ingestion process for a pipeline using the provided IngestorInput config. + Returns the ingestion results or raises an exception on failure. + """ + # Ensure Ingestor is imported locally or globally if needed + # from ingestion.core import Ingestor # Example import if needed + + # Check if Ingestor is already available (e.g., imported at module level) + # If not, uncomment the import above or ensure it's accessible. + # Assuming Ingestor is available in the scope: + try: + # Avoid circular import + from ingestion.core import Ingestor + + logger.info(f"Executing ingestion with config: {config}") + # NOTE: Can be async + results = Ingestor.run(config.sources) + logger.info(f"Ingestion completed successfully. Results: {results}") + return results + except ImportError: + logger.error("Failed to import Ingestor. Cannot execute ingestion.") + raise RuntimeError("Ingestion module not found") + except Exception as e: + logger.error(f"Ingestion execution failed: {e}", exc_info=True) + raise diff --git a/pipeline/tests/test_pipeline_service.py b/pipeline/tests/test_pipeline_service.py new file mode 100644 index 0000000..7a88643 --- /dev/null +++ b/pipeline/tests/test_pipeline_service.py @@ -0,0 +1,571 @@ +import pytest +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch +from uuid import UUID, uuid4 +from datetime import datetime, timedelta, timezone + +from freezegun import freeze_time + +from models.pipeline import ( + Pipeline, + PipelineCreate, + PipelineConfig, + RunFrequency, + PipelineStatus, +) +from models.ingestion import ( + IngestorInput, + IngestSourceConfig, + SourceType, + ApiConfig, +) +from services.pipeline_service import PipelineService +from stores.base import PipelineStore +from scheduler.manager import SchedulerManager +from scheduler.utils import calculate_next_run + +pytestmark = pytest.mark.asyncio + +# --- Fixtures --- + + +@pytest.fixture +def mock_store(mocker) -> AsyncMock: + """Fixture for a mocked PipelineStore.""" + mock = mocker.patch("stores.base.PipelineStore", spec=PipelineStore) + mock.save = AsyncMock(return_value=None) + mock.get = AsyncMock(return_value=None) + mock.get_all = AsyncMock(return_value=[]) + mock.delete = AsyncMock(return_value=False) + return mock + + +@pytest.fixture +def mock_scheduler(mocker) -> AsyncMock: + """Fixture for a mocked SchedulerManager.""" + mock = mocker.patch("scheduler.manager.SchedulerManager", spec=SchedulerManager) + mock.schedule_pipeline = AsyncMock(return_value=None) + mock.reschedule_pipeline = AsyncMock(return_value=None) + mock.unschedule_pipeline = AsyncMock(return_value=None) + + # NOTE: add other methods if the service starts calling them + return mock + + +@pytest.fixture +def pipeline_service(mock_store, mock_scheduler) -> PipelineService: + """Fixture for PipelineService instance with mocked dependencies.""" + service = PipelineService(store=mock_store, scheduler_manager=mock_scheduler) + + # NOTE: if has internal methods to mock (like actual execution logic) + # NOTE: you might patch them here or within specific tests using mocker.patch.object + + return service + + +@pytest.fixture +def sample_ingestor_input() -> IngestorInput: + """Sample IngestorInput for creating pipelines.""" + return IngestorInput( + sources=[ + IngestSourceConfig( + type=SourceType.API, config=ApiConfig(url="http://example.com/api") + ) + ] + ) + + +@pytest.fixture +def sample_pipeline_config(sample_ingestor_input) -> PipelineConfig: + """Sample PipelineConfig.""" + return PipelineConfig( + ingestor_config=sample_ingestor_input, + run_frequency=RunFrequency.DAILY, + last_run=None, + next_run=None, + ) + + +@pytest.fixture +def sample_pipeline(sample_pipeline_config) -> Pipeline: + """Sample Pipeline object.""" + now = datetime.now(timezone.utc) + pid = uuid4() + next_run = calculate_next_run(sample_pipeline_config.run_frequency, None, now) + return Pipeline( + id=pid, + name="Test Pipeline", + description="A pipeline for testing", + config=sample_pipeline_config.model_copy( + update={"next_run": next_run} + ), # Use updated config + status=PipelineStatus.INACTIVE, + created_at=now - timedelta(hours=1), + updated_at=now - timedelta(minutes=30), + ) + + +@pytest.fixture +def sample_pipeline_create(sample_pipeline_config) -> PipelineCreate: + """Sample PipelineCreate object for updates/creations.""" + return PipelineCreate( + name="New Test Pipeline", + description="Creating a pipeline", + config=sample_pipeline_config, + ) + + +# --- Test Cases --- + +FROZEN_TIME = datetime(2025, 5, 12, 12, 30, 0, tzinfo=timezone.utc) # NOTE: 7:30 PM +07 + + +@freeze_time(FROZEN_TIME) +async def test_create_pipeline_success( + pipeline_service: PipelineService, + mock_store: AsyncMock, + mock_scheduler: AsyncMock, + sample_ingestor_input: IngestorInput, +): + """Test successful pipeline creation.""" + name = "My New Pipeline" + description = "Test description" + frequency = RunFrequency.WEEKLY + + expected_next_run = calculate_next_run(frequency, None, FROZEN_TIME) + + created_pipeline = await pipeline_service.create_pipeline( + name=name, + description=description, + ingestor_config=sample_ingestor_input, + run_frequency=frequency, + ) + + assert created_pipeline is not None + assert created_pipeline.name == name + assert created_pipeline.description == description + assert created_pipeline.config.run_frequency == frequency + assert created_pipeline.config.ingestor_config == sample_ingestor_input + assert created_pipeline.status == PipelineStatus.INACTIVE + assert created_pipeline.created_at == FROZEN_TIME + assert created_pipeline.updated_at == FROZEN_TIME + assert created_pipeline.config.last_run is None + assert created_pipeline.config.next_run == expected_next_run + assert isinstance(created_pipeline.id, UUID) + + mock_store.save.assert_awaited_once() + saved_pipeline_arg = mock_store.save.call_args[0][0] + assert isinstance(saved_pipeline_arg, Pipeline) + assert saved_pipeline_arg.id == created_pipeline.id + assert saved_pipeline_arg.config.next_run == expected_next_run + + # Verify scheduler interaction (using create_task, so check if called) + # We check if schedule_pipeline was called with the created pipeline object + await asyncio.sleep(0) # Allow create_task to potentially run + mock_scheduler.schedule_pipeline.assert_awaited_once_with(created_pipeline) + + +async def test_create_pipeline_store_error( + pipeline_service: PipelineService, + mock_store: AsyncMock, + mock_scheduler: AsyncMock, + sample_ingestor_input: IngestorInput, +): + """Test pipeline creation when store save fails.""" + mock_store.save.side_effect = Exception("Database connection error") + + with pytest.raises(Exception, match="Database connection error"): + await pipeline_service.create_pipeline( + name="Fail Pipeline", + description="This should fail", + ingestor_config=sample_ingestor_input, + run_frequency=RunFrequency.DAILY, + ) + + # Ensure scheduler was NOT called if store save failed + mock_scheduler.schedule_pipeline.assert_not_awaited() + + +@freeze_time(FROZEN_TIME) +async def test_update_pipeline_success_no_freq_change( + pipeline_service: PipelineService, + mock_store: AsyncMock, + mock_scheduler: AsyncMock, + sample_pipeline: Pipeline, +): + """Test successful update without changing run frequency.""" + mock_store.get.return_value = sample_pipeline + + update_payload = PipelineCreate( + name="Updated Name", + description="Updated Description", + config=sample_pipeline.config, + ) + + updated_pipeline = await pipeline_service.update_pipeline( + sample_pipeline.id, update_payload + ) + + assert updated_pipeline is not None + assert updated_pipeline.id == sample_pipeline.id + assert updated_pipeline.name == "Updated Name" + assert updated_pipeline.description == "Updated Description" + assert updated_pipeline.config.run_frequency == sample_pipeline.config.run_frequency + assert updated_pipeline.config.next_run == sample_pipeline.config.next_run + assert updated_pipeline.created_at == sample_pipeline.created_at + + mock_store.get.assert_awaited_once_with(sample_pipeline.id) + mock_store.save.assert_awaited_once() + saved_pipeline_arg = mock_store.save.call_args[0][0] + assert saved_pipeline_arg.id == sample_pipeline.id + assert saved_pipeline_arg.name == "Updated Name" + assert saved_pipeline_arg.config.next_run == sample_pipeline.config.next_run + + # Verify scheduler interaction (should still be called if config changes, even if freq doesn't) + await asyncio.sleep(0) + mock_scheduler.reschedule_pipeline.assert_awaited_once_with(updated_pipeline) + + +@freeze_time(FROZEN_TIME) +async def test_update_pipeline_success_with_freq_change( + pipeline_service: PipelineService, + mock_store: AsyncMock, + mock_scheduler: AsyncMock, + sample_pipeline: Pipeline, # Existing pipeline (Daily frequency) + sample_ingestor_input: IngestorInput, +): + """Test successful update changing run frequency.""" + mock_store.get.return_value = sample_pipeline + original_next_run = sample_pipeline.config.next_run + + new_config = PipelineConfig( + ingestor_config=sample_ingestor_input, + run_frequency=RunFrequency.MONTHLY, + last_run=sample_pipeline.config.last_run, + ) + update_payload = PipelineCreate( + name=sample_pipeline.name, + description=sample_pipeline.description, + config=new_config, + ) + + # Calculate expected next run for MONTHLY based on FROZEN_TIME and no last_run + expected_new_next_run = calculate_next_run( + RunFrequency.MONTHLY, sample_pipeline.config.last_run, FROZEN_TIME + ) + + updated_pipeline = await pipeline_service.update_pipeline( + sample_pipeline.id, update_payload + ) + + assert updated_pipeline is not None + assert updated_pipeline.id == sample_pipeline.id + assert updated_pipeline.config.run_frequency == RunFrequency.MONTHLY + assert updated_pipeline.config.next_run != original_next_run + assert updated_pipeline.config.next_run == expected_new_next_run + + mock_store.get.assert_awaited_once_with(sample_pipeline.id) + mock_store.save.assert_awaited_once() + saved_pipeline_arg = mock_store.save.call_args[0][0] + assert saved_pipeline_arg.config.run_frequency == RunFrequency.MONTHLY + assert saved_pipeline_arg.config.next_run == expected_new_next_run + + await asyncio.sleep(0) + mock_scheduler.reschedule_pipeline.assert_awaited_once_with(updated_pipeline) + + +async def test_update_pipeline_not_found( + pipeline_service: PipelineService, + mock_store: AsyncMock, + mock_scheduler: AsyncMock, + sample_pipeline_create: PipelineCreate, +): + """Test updating a pipeline that doesn't exist.""" + non_existent_id = uuid4() + mock_store.get.return_value = None + + updated_pipeline = await pipeline_service.update_pipeline( + non_existent_id, sample_pipeline_create + ) + + assert updated_pipeline is None + mock_store.get.assert_awaited_once_with(non_existent_id) + mock_store.save.assert_not_awaited() + mock_scheduler.reschedule_pipeline.assert_not_awaited() + + +async def test_delete_pipeline_success( + pipeline_service: PipelineService, + mock_store: AsyncMock, + mock_scheduler: AsyncMock, + sample_pipeline: Pipeline, +): + """Test successful pipeline deletion.""" + mock_store.delete.return_value = True + mock_store.get.return_value = sample_pipeline + + pipeline_id = sample_pipeline.id + deleted = await pipeline_service.delete_pipeline(pipeline_id) + + assert deleted is True + call_order = MagicMock() + call_order.attach_mock(mock_scheduler.unschedule_pipeline, "scheduler_unschedule") + call_order.attach_mock(mock_store.delete, "store_delete") + + mock_scheduler.unschedule_pipeline.assert_awaited_once_with(pipeline_id) + mock_store.delete.assert_awaited_once_with(pipeline_id) + + # NOTE: can check call order hear + # assert call_order.mock_calls == [ + # mocker.call.scheduler_unschedule(pipeline_id), + # mocker.call.store_delete(pipeline_id), + # ] + + +async def test_delete_pipeline_not_found( + pipeline_service: PipelineService, + mock_store: AsyncMock, + mock_scheduler: AsyncMock, +): + """Test deleting a pipeline that doesn't exist.""" + non_existent_id = uuid4() + mock_store.get.return_value = None + mock_store.delete.return_value = False + + deleted = await pipeline_service.delete_pipeline(non_existent_id) + + assert deleted is False + + # Verify interactions: + mock_store.get.assert_awaited_once_with(non_existent_id) # Verify get was checked + # Scheduler should NOT be called if pipeline wasn't found by get + mock_scheduler.unschedule_pipeline.assert_not_awaited() + # Store delete should NOT be called if pipeline wasn't found by get + mock_store.delete.assert_not_awaited() + + +async def test_get_pipeline_success( + pipeline_service: PipelineService, + mock_store: AsyncMock, + sample_pipeline: Pipeline, +): + """Test getting an existing pipeline.""" + mock_store.get.return_value = sample_pipeline + pipeline_id = sample_pipeline.id + + result = await pipeline_service.get_pipeline(pipeline_id) + + assert result == sample_pipeline + mock_store.get.assert_awaited_once_with(pipeline_id) + + +async def test_get_pipeline_not_found( + pipeline_service: PipelineService, + mock_store: AsyncMock, +): + """Test getting a non-existent pipeline.""" + mock_store.get.return_value = None + pipeline_id = uuid4() + + result = await pipeline_service.get_pipeline(pipeline_id) + + assert result is None + mock_store.get.assert_awaited_once_with(pipeline_id) + + +async def test_list_pipelines( + pipeline_service: PipelineService, + mock_store: AsyncMock, + sample_pipeline: Pipeline, +): + """Test listing pipelines.""" + pipelines_list = [ + sample_pipeline, + sample_pipeline.model_copy(update={"id": uuid4(), "name": "Another Pipeline"}), + ] + mock_store.get_all.return_value = pipelines_list + + result = await pipeline_service.list_pipelines() + + assert result == pipelines_list + mock_store.get_all.assert_awaited_once() + + +# --- Tests for run_pipeline --- + + +async def test_run_pipeline_not_found( + pipeline_service: PipelineService, + mock_store: AsyncMock, +): + """Test running a pipeline that doesn't exist.""" + mock_store.get.return_value = None + pipeline_id = uuid4() + + # Patch the internal execution method just in case, although it shouldn't be reached + with patch.object( + pipeline_service, "_execute_ingestion", new_callable=AsyncMock + ) as mock_exec: + await pipeline_service.run_pipeline(pipeline_id) + + mock_store.get.assert_awaited_once_with(pipeline_id) + mock_store.save.assert_not_awaited() + mock_exec.assert_not_awaited() + + +async def test_run_pipeline_already_active( + pipeline_service: PipelineService, + mock_store: AsyncMock, + sample_pipeline: Pipeline, +): + """Test running a pipeline that is already in ACTIVE status.""" + active_pipeline = sample_pipeline.model_copy( + update={"status": PipelineStatus.ACTIVE} + ) + mock_store.get.return_value = active_pipeline + pipeline_id = active_pipeline.id + + with patch.object( + pipeline_service, "_execute_ingestion", new_callable=AsyncMock + ) as mock_exec: + await pipeline_service.run_pipeline(pipeline_id) + + mock_store.get.assert_awaited_once_with(pipeline_id) + mock_store.save.assert_not_awaited() + mock_exec.assert_not_awaited() + + +@freeze_time(FROZEN_TIME) +async def test_run_pipeline_success( + pipeline_service: PipelineService, + mock_store: AsyncMock, + mock_scheduler: AsyncMock, + sample_pipeline: Pipeline, + mocker, +): + """Test a successful pipeline run.""" + pipeline_id = sample_pipeline.id + + # --- Setup Mock Responses --- + # 1. Initial get returns the inactive pipeline + # 2. Second get (after execution) returns the pipeline again (simulate no external changes) + mock_store.get.side_effect = [ + sample_pipeline, + sample_pipeline.model_copy(update={"status": PipelineStatus.ACTIVE}), + ] + + # --- Patch Internal Execution Logic --- + mock_execute_ingestion = mocker.patch.object( + pipeline_service, "_execute_ingestion", new_callable=AsyncMock + ) + mock_execute_ingestion.return_value = None + + # --- Execute --- + await pipeline_service.run_pipeline(pipeline_id) + + # --- Assertions --- + + # Verify get calls + assert mock_store.get.await_count == 2 + mock_store.get.assert_any_await(pipeline_id) + + # Verify execution logic was called + mock_execute_ingestion.assert_awaited_once() # Check if the core logic ran + + # Verify save calls (should be 2: one for ACTIVE, one for INACTIVE+updates) + assert mock_store.save.await_count == 2 + + # Check the first save call (setting status to ACTIVE) + call1_args = mock_store.save.await_args_list[0][0] + saved_pipeline_active: Pipeline = call1_args[0] + assert saved_pipeline_active.id == pipeline_id + assert saved_pipeline_active.status == PipelineStatus.ACTIVE + + # Check the second save call (setting status to INACTIVE and updating times) + call2_args = mock_store.save.await_args_list[1][0] + saved_pipeline_final: Pipeline = call2_args[0] + assert saved_pipeline_final.id == pipeline_id + assert saved_pipeline_final.status == PipelineStatus.INACTIVE + assert ( + saved_pipeline_final.config.last_run == FROZEN_TIME + ) # Should be updated to now + # Verify next_run was recalculated based on the new last_run time + expected_next_run_after_success = calculate_next_run( + saved_pipeline_final.config.run_frequency, + FROZEN_TIME, # The new last_run + FROZEN_TIME, + ) + assert saved_pipeline_final.config.next_run == expected_next_run_after_success + # assert ( + # saved_pipeline_final.config.next_run != original_next_run + # ) # Ensure it changed + + # Verify scheduler notification for reschedule after completion + await asyncio.sleep(0) # Allow create_task to run + mock_scheduler.reschedule_pipeline.assert_awaited_once_with(saved_pipeline_final) + + +@freeze_time(FROZEN_TIME) +async def test_run_pipeline_execution_fails( + pipeline_service: PipelineService, + mock_store: AsyncMock, + mock_scheduler: AsyncMock, + sample_pipeline: Pipeline, + mocker, +): + """Test a pipeline run where the internal execution logic fails.""" + pipeline_id = sample_pipeline.id + + # --- Setup Mock Responses --- + mock_store.get.side_effect = [ + sample_pipeline, + sample_pipeline.model_copy(update={"status": PipelineStatus.ACTIVE}), + ] + + # --- Patch Internal Execution Logic to Raise Error --- + mock_execute_ingestion = mocker.patch.object( + pipeline_service, "_execute_ingestion", new_callable=AsyncMock + ) + execution_error = ValueError("Something went wrong during ingestion") + mock_execute_ingestion.side_effect = execution_error + + # --- Execute --- + # The service currently catches the exception and logs it, but doesn't re-raise. + # If it re-raised, we'd wrap this in pytest.raises. + await pipeline_service.run_pipeline(pipeline_id) + + # --- Assertions --- + + # Verify get calls + assert mock_store.get.await_count == 2 + mock_store.get.assert_any_await(pipeline_id) + + # Verify execution logic was called and raised error + mock_execute_ingestion.assert_awaited_once() + + # Verify save calls (should be 2: one for ACTIVE, one for INACTIVE after failure) + assert mock_store.save.await_count == 2 + + # Check the first save call (setting status to ACTIVE) + call1_args = mock_store.save.await_args_list[0][0] + saved_pipeline_active: Pipeline = call1_args[0] + assert saved_pipeline_active.status == PipelineStatus.ACTIVE + + # Check the second save call (setting status back to INACTIVE, NO last_run update) + call2_args = mock_store.save.await_args_list[1][0] + saved_pipeline_final: Pipeline = call2_args[0] + assert saved_pipeline_final.id == pipeline_id + assert saved_pipeline_final.status == PipelineStatus.FAILED + # ! IMPORTANT: last_run should NOT be updated on failure + assert saved_pipeline_final.config.last_run == sample_pipeline.config.last_run + # Next run should be recalculated based on the *original* last_run + expected_next_run_after_fail = calculate_next_run( + saved_pipeline_final.config.run_frequency, + sample_pipeline.config.last_run, # Use original last_run + FROZEN_TIME, + ) + assert saved_pipeline_final.config.next_run == expected_next_run_after_fail + + # Verify scheduler notification for reschedule even after failure + await asyncio.sleep(0) + mock_scheduler.reschedule_pipeline.assert_awaited_once_with(saved_pipeline_final) diff --git a/pipeline/uv.lock b/pipeline/uv.lock index f30f602..02dbdd4 100644 --- a/pipeline/uv.lock +++ b/pipeline/uv.lock @@ -317,12 +317,14 @@ dependencies = [ { name = "apscheduler" }, { name = "crawl4ai" }, { name = "fastapi", extra = ["standard"] }, + { name = "freezegun" }, { name = "inquirer" }, { name = "loguru" }, { name = "pandas" }, { name = "pydantic-settings" }, { name = "pytest" }, { name = "pytest-asyncio" }, + { name = "pytest-mock" }, { name = "python-dotenv" }, { name = "responses" }, { name = "rich" }, @@ -333,12 +335,14 @@ requires-dist = [ { name = "apscheduler", specifier = ">=3.11.0" }, { name = "crawl4ai", specifier = ">=0.5.0.post8" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.115.12" }, + { name = "freezegun", specifier = ">=1.5.1" }, { name = "inquirer", specifier = ">=3.4.0" }, { name = "loguru", specifier = ">=0.7.3" }, { name = "pandas", specifier = ">=2.2.3" }, { name = "pydantic-settings", specifier = ">=2.9.1" }, { name = "pytest", specifier = ">=8.3.5" }, { name = "pytest-asyncio", specifier = ">=0.26.0" }, + { name = "pytest-mock", specifier = ">=3.14.0" }, { name = "python-dotenv", specifier = ">=1.1.0" }, { name = "responses", specifier = ">=0.25.7" }, { name = "rich", specifier = ">=14.0.0" }, @@ -518,6 +522,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/4d/36/2a115987e2d8c300a974597416d9de88f2444426de9571f4b59b2cca3acc/filelock-3.18.0-py3-none-any.whl", hash = "sha256:c401f4f8377c4464e6db25fff06205fd89bdd83b65eb0488ed1b160f780e21de", size = 16215 }, ] +[[package]] +name = "freezegun" +version = "1.5.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2c/ef/722b8d71ddf4d48f25f6d78aa2533d505bf3eec000a7cacb8ccc8de61f2f/freezegun-1.5.1.tar.gz", hash = "sha256:b29dedfcda6d5e8e083ce71b2b542753ad48cfec44037b3fc79702e2980a89e9", size = 33697 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/51/0b/0d7fee5919bccc1fdc1c2a7528b98f65c6f69b223a3fd8f809918c142c36/freezegun-1.5.1-py3-none-any.whl", hash = "sha256:bf111d7138a8abe55ab48a71755673dbaa4ab87f4cff5634a4442dfec34c15f1", size = 17569 }, +] + [[package]] name = "frozenlist" version = "1.5.0" @@ -1411,6 +1427,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/20/7f/338843f449ace853647ace35870874f69a764d251872ed1b4de9f234822c/pytest_asyncio-0.26.0-py3-none-any.whl", hash = "sha256:7b51ed894f4fbea1340262bdae5135797ebbe21d8638978e35d31c6d19f72fb0", size = 19694 }, ] +[[package]] +name = "pytest-mock" +version = "3.14.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c6/90/a955c3ab35ccd41ad4de556596fa86685bf4fc5ffcc62d22d856cfd4e29a/pytest-mock-3.14.0.tar.gz", hash = "sha256:2719255a1efeceadbc056d6bf3df3d1c5015530fb40cf347c0f9afac88410bd0", size = 32814 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f2/3b/b26f90f74e2986a82df6e7ac7e319b8ea7ccece1caec9f8ab6104dc70603/pytest_mock-3.14.0-py3-none-any.whl", hash = "sha256:0b72c38033392a5f4621342fe11e9219ac11ec9d375f8e2a0c164539e0d70f6f", size = 9863 }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0"