From e632ee05112d8fd3b253dc8c1f8ba721cdbdcc5e Mon Sep 17 00:00:00 2001 From: Sosokker Date: Mon, 7 Apr 2025 22:53:09 +0700 Subject: [PATCH] refactor: initial backend code --- backend/.gitignore | 10 + backend/.python-version | 1 + backend/alembic.ini | 119 ++++++++++++ backend/alembic/README | 1 + backend/alembic/env.py | 78 ++++++++ backend/alembic/script.py.mako | 28 +++ backend/app/__init__.py | 0 backend/app/api/__init__.py | 0 backend/app/api/v1/endpoints/pipelines.py | 123 ++++++++++++ backend/app/config.py | 34 ++++ backend/app/core/__init__.py | 0 backend/app/core/config.py | 42 ++++ backend/app/core/db.py | 66 +++++++ backend/app/crud/__init__.py | 3 + backend/app/crud/crud_pipeline.py | 122 ++++++++++++ backend/app/main.py | 100 ++++++++++ backend/app/models/__init__.py | 8 + backend/app/models/pipeline.py | 82 ++++++++ backend/app/schemas/__init__.py | 18 ++ backend/app/schemas/pipeline.py | 120 ++++++++++++ backend/app/services/__init__.py | 0 backend/app/services/processing_service.py | 124 ++++++++++++ backend/app/workers/__init__.py | 0 backend/app/workers/tasks.py | 214 +++++++++++++++++++++ backend/pyproject.toml | 34 ++++ 25 files changed, 1327 insertions(+) create mode 100644 backend/.gitignore create mode 100644 backend/.python-version create mode 100644 backend/alembic.ini create mode 100644 backend/alembic/README create mode 100644 backend/alembic/env.py create mode 100644 backend/alembic/script.py.mako create mode 100644 backend/app/__init__.py create mode 100644 backend/app/api/__init__.py create mode 100644 backend/app/api/v1/endpoints/pipelines.py create mode 100644 backend/app/config.py create mode 100644 backend/app/core/__init__.py create mode 100644 backend/app/core/config.py create mode 100644 backend/app/core/db.py create mode 100644 backend/app/crud/__init__.py create mode 100644 backend/app/crud/crud_pipeline.py create mode 100644 backend/app/main.py create mode 100644 backend/app/models/__init__.py create mode 100644 backend/app/models/pipeline.py create mode 100644 backend/app/schemas/__init__.py create mode 100644 backend/app/schemas/pipeline.py create mode 100644 backend/app/services/__init__.py create mode 100644 backend/app/services/processing_service.py create mode 100644 backend/app/workers/__init__.py create mode 100644 backend/app/workers/tasks.py create mode 100644 backend/pyproject.toml diff --git a/backend/.gitignore b/backend/.gitignore new file mode 100644 index 0000000..505a3b1 --- /dev/null +++ b/backend/.gitignore @@ -0,0 +1,10 @@ +# Python-generated files +__pycache__/ +*.py[oc] +build/ +dist/ +wheels/ +*.egg-info + +# Virtual environments +.venv diff --git a/backend/.python-version b/backend/.python-version new file mode 100644 index 0000000..e4fba21 --- /dev/null +++ b/backend/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/backend/alembic.ini b/backend/alembic.ini new file mode 100644 index 0000000..0fbe2c2 --- /dev/null +++ b/backend/alembic.ini @@ -0,0 +1,119 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +# Use forward slashes (/) also on windows to provide an os agnostic path +script_location = alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library. +# Any required deps can installed by adding `alembic[tz]` to the pip requirements +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to alembic/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +# version_path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +version_path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the exec runner, execute a binary +# hooks = ruff +# ruff.type = exec +# ruff.executable = %(here)s/.venv/bin/ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/backend/alembic/README b/backend/alembic/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/backend/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/backend/alembic/env.py b/backend/alembic/env.py new file mode 100644 index 0000000..36112a3 --- /dev/null +++ b/backend/alembic/env.py @@ -0,0 +1,78 @@ +from logging.config import fileConfig + +from sqlalchemy import engine_from_config +from sqlalchemy import pool + +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/backend/alembic/script.py.mako b/backend/alembic/script.py.mako new file mode 100644 index 0000000..480b130 --- /dev/null +++ b/backend/alembic/script.py.mako @@ -0,0 +1,28 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + """Upgrade schema.""" + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + """Downgrade schema.""" + ${downgrades if downgrades else "pass"} diff --git a/backend/app/__init__.py b/backend/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/api/v1/endpoints/pipelines.py b/backend/app/api/v1/endpoints/pipelines.py new file mode 100644 index 0000000..06ffb08 --- /dev/null +++ b/backend/app/api/v1/endpoints/pipelines.py @@ -0,0 +1,123 @@ +import logging +import random +from typing import List + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.ext.asyncio import AsyncSession + +from app import crud, models, schemas # Import local schemas and crud +from app.core.db import get_db # Import DB dependency +from app.services.processing_service import ProcessingService # Import services +from app.workers.tasks import run_pipeline_task # Import Celery task + +logger = logging.getLogger(__name__) +router = APIRouter() + +# --- API Endpoint Definitions --- + + +@router.post("/", response_model=schemas.PipelineRead, status_code=201) +async def create_pipeline( + pipeline_in: schemas.PipelineCreate, + db: AsyncSession = Depends(get_db), +): + """ + DUMMY: Creates a new pipeline configuration. + """ + logger.info("Endpoint: create_pipeline called") + # In real implementation: Add checks, call services if needed. + # Here, directly call (dummy) CRUD. + created_pipeline = await crud.pipeline.create(db=db, obj_in=pipeline_in) + # No need to check for existence as dummy create always returns something + return created_pipeline + + +@router.get("/", response_model=List[schemas.PipelineRead]) +async def read_pipelines(db: AsyncSession = Depends(get_db), skip: int = 0, limit: int = Query(100, le=200)): + """ + DUMMY: Retrieves a list of pipelines. + """ + logger.info("Endpoint: read_pipelines called") + # Call (dummy) CRUD + pipelines = await crud.pipeline.get_multi(db, skip=skip, limit=limit) + return pipelines + + +@router.get("/{pipeline_id}", response_model=schemas.PipelineReadWithDetails) +async def read_pipeline(pipeline_id: int, db: AsyncSession = Depends(get_db)): + """ + DUMMY: Retrieves details for a specific pipeline, including sources and recent runs. + """ + logger.info(f"Endpoint: read_pipeline called for id={pipeline_id}") + # Call (dummy) CRUD that includes related data loading simulation + db_pipeline = await crud.pipeline.get_with_details(db, id=pipeline_id) + if db_pipeline is None: + # Raise standard FastAPI exception for not found + raise HTTPException(status_code=404, detail="Pipeline not found (simulated)") + return db_pipeline + + +@router.put("/{pipeline_id}", response_model=schemas.PipelineRead) +async def update_pipeline( + pipeline_id: int, + pipeline_in: schemas.PipelineUpdate, + db: AsyncSession = Depends(get_db), +): + """ + DUMMY: Updates an existing pipeline configuration. + """ + logger.info(f"Endpoint: update_pipeline called for id={pipeline_id}") + # First, get the existing object (dummy) + db_pipeline = await crud.pipeline.get(db, id=pipeline_id) + if not db_pipeline: + raise HTTPException(status_code=404, detail="Pipeline not found (simulated)") + + # Call (dummy) CRUD update + updated_pipeline = await crud.pipeline.update(db=db, db_obj=db_pipeline, obj_in=pipeline_in) + return updated_pipeline + + +@router.delete("/{pipeline_id}", response_model=schemas.PipelineRead) +async def delete_pipeline(pipeline_id: int, db: AsyncSession = Depends(get_db)): + """ + DUMMY: Deletes a pipeline configuration. + Returns the deleted object representation. + """ + logger.info(f"Endpoint: delete_pipeline called for id={pipeline_id}") + # Call (dummy) CRUD remove + deleted_pipeline = await crud.pipeline.remove(db=db, id=pipeline_id) + if deleted_pipeline is None: + raise HTTPException(status_code=404, detail="Pipeline not found (simulated)") + return deleted_pipeline # Return the object that was 'deleted' + + +@router.post("/{pipeline_id}/run", status_code=202, response_model=dict) +async def trigger_pipeline_run(pipeline_id: int, db: AsyncSession = Depends(get_db)): + """ + DUMMY: Simulates triggering an asynchronous pipeline run via Celery. + """ + logger.info(f"Endpoint: trigger_pipeline_run called for id={pipeline_id}") + # Check pipeline status using dummy CRUD + db_pipeline = await crud.pipeline.get(db, id=pipeline_id) + if not db_pipeline: + raise HTTPException(status_code=404, detail="Pipeline not found (simulated)") + if db_pipeline.status == schemas.PipelineStatus.PAUSED: + raise HTTPException(status_code=400, detail="Pipeline is paused (simulated)") + if db_pipeline.status == schemas.PipelineStatus.RUNNING: + raise HTTPException(status_code=409, detail="Pipeline is already running (simulated)") + + # Simulate scheduling the Celery task + logger.info(f"Endpoint: Simulating run_pipeline_task.delay({pipeline_id})") + # In real code: task = run_pipeline_task.delay(pipeline_id) + # task_id = task.id + # For dummy: + dummy_task_id = f"dummy-celery-task-{random.randint(10000, 99999)}" + logger.info(f"Endpoint: Simulated task scheduling, got dummy task ID: {dummy_task_id}") + + # Optionally update pipeline status immediately (using dummy crud) + # await crud.pipeline.update_pipeline_status(db, pipeline_id=pipeline_id, status=schemas.PipelineStatus.RUNNING) + + return {"message": "Pipeline run simulated successfully", "job_id": dummy_task_id} + + +# --- Add dummy endpoints for pause/resume if needed, similar to trigger_pipeline_run --- diff --git a/backend/app/config.py b/backend/app/config.py new file mode 100644 index 0000000..3a1c38a --- /dev/null +++ b/backend/app/config.py @@ -0,0 +1,34 @@ +from pydantic_settings import BaseSettings +from pathlib import Path + +# Define a base directory for uploads, ensure it exists +UPLOAD_DIR = Path("./uploads") +UPLOAD_DIR.mkdir(exist_ok=True) + + +class Settings(BaseSettings): + PROJECT_NAME: str = "Data Integration Pipeline API" + API_V1_STR: str = "/api/v1" + + DATABASE_URL: str = "postgresql+asyncpg://user:password@db/data_pipeline_db" + CELERY_BROKER_URL: str = "redis://redis:6379/0" + CELERY_RESULT_BACKEND: str = "redis://redis:6379/0" + + OPENAI_API_KEY: str = "your_openai_key_here" # Replace in .env or secrets manager + NEWS_API_KEY: str | None = None # Replace if using Bing etc. + + # Example Thai RSS feeds - load from config file or DB ideally + NEWS_SOURCES_RSS: list[str] = [ + "https://www.bangkokpost.com/rss/data/most-recent.xml", + "https://www.nationthailand.com/rss/feed.xml", + ] + + UPLOAD_DIR: Path = UPLOAD_DIR # Make upload dir accessible via settings + + class Config: + env_file = ".env" + case_sensitive = True + extra = "ignore" + + +settings = Settings() diff --git a/backend/app/core/__init__.py b/backend/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/core/config.py b/backend/app/core/config.py new file mode 100644 index 0000000..83d1d74 --- /dev/null +++ b/backend/app/core/config.py @@ -0,0 +1,42 @@ +import logging +from pathlib import Path +from pydantic_settings import BaseSettings, SettingsConfigDict + +logger = logging.getLogger(__name__) + +# Define a base directory for uploads relative to this config file's location +# Recommended: Define UPLOAD_DIR based on an environment variable or absolute path in production +_BASE_DIR = Path(__file__).resolve().parent.parent.parent # Moves up from core -> app -> backend +UPLOAD_DIR_DEFAULT = _BASE_DIR / "uploads" + +class Settings(BaseSettings): + """Application configuration settings.""" + + PROJECT_NAME: str = "Borbann Backend API" + API_V1_STR: str = "/api/v1" + LOG_LEVEL: str = "INFO" + + # Database configuration (sensitive, use secrets management in production) + DATABASE_URL: str = "postgresql+asyncpg://user:password@db:5432/borbann_db" + + # Celery configuration (sensitive, use secrets management in production) + CELERY_BROKER_URL: str = "redis://redis:6379/0" + CELERY_RESULT_BACKEND: str = "redis://redis:6379/1" # Use different DB for results + + # Example external API key (sensitive) + # SOME_EXTERNAL_API_KEY: str | None = None + + # File Uploads + UPLOAD_DIR: Path = UPLOAD_DIR_DEFAULT + + model_config = SettingsConfigDict( + env_file=".env", env_file_encoding="utf-8", extra="ignore", case_sensitive=True + ) + +settings = Settings() + +# Ensure upload directory exists (can be done on startup as well) +try: + settings.UPLOAD_DIR.mkdir(parents=True, exist_ok=True) +except OSError as e: + logger.error(f"Could not create upload directory: {settings.UPLOAD_DIR} - {e}") \ No newline at end of file diff --git a/backend/app/core/db.py b/backend/app/core/db.py new file mode 100644 index 0000000..259b53c --- /dev/null +++ b/backend/app/core/db.py @@ -0,0 +1,66 @@ +import logging +from typing import AsyncGenerator +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.orm import declarative_base +from app.core.config import settings + +logger = logging.getLogger(__name__) + +# Define a base for declarative models +Base = declarative_base() + +# Create the async engine +try: + engine = create_async_engine( + settings.DATABASE_URL, + pool_pre_ping=True, + # echo=True, # Uncomment for debugging SQL statements + ) + logger.info("Database engine created successfully.") +except Exception as e: + logger.error(f"Failed to create database engine: {e}", exc_info=True) + # Depending on the application, you might want to exit here + # sys.exit(1) + engine = None # Ensure engine is None if creation failed + +# Create a sessionmaker +if engine: + AsyncSessionFactory = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) +else: + AsyncSessionFactory = None # No factory if engine failed + + +async def get_db() -> AsyncGenerator[AsyncSession, None]: + """FastAPI dependency to get an async database session.""" + if not AsyncSessionFactory: + logger.error("Database session factory not configured.") + raise RuntimeError("Database not configured.") + + async with AsyncSessionFactory() as session: + # Optional: Start transaction (though commit/rollback might happen elsewhere) + # async with session.begin(): + try: + yield session + # If not using 'async with session.begin()', you might commit here + # await session.commit() + except Exception: + logger.exception("Session rollback due to exception") + await session.rollback() + raise + finally: + # Close is handled by the context manager 'async with AsyncSessionFactory()' + pass + + +async def check_db_connection() -> bool: + """Optional function to check DB connection on startup.""" + if not engine: + return False + try: + async with engine.connect() as connection: + # You can execute a simple query like "SELECT 1" if needed + logger.info("Database connection verified.") + return True + except Exception as e: + logger.error(f"Database connection failed: {e}", exc_info=True) + return False diff --git a/backend/app/crud/__init__.py b/backend/app/crud/__init__.py new file mode 100644 index 0000000..6571f23 --- /dev/null +++ b/backend/app/crud/__init__.py @@ -0,0 +1,3 @@ +# Make CRUD functions easily accessible +from .crud_pipeline import pipeline, data_source, pipeline_run +from .crud_news_article import news_article diff --git a/backend/app/crud/crud_pipeline.py b/backend/app/crud/crud_pipeline.py new file mode 100644 index 0000000..845930e --- /dev/null +++ b/backend/app/crud/crud_pipeline.py @@ -0,0 +1,122 @@ +import logging +import random +from datetime import datetime, timedelta +from typing import List, Optional, Sequence + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy.orm import selectinload + +from app import models, schemas + +logger = logging.getLogger(__name__) + + +class CRUDPipeline: + """CRUD operations for Pipeline models (Dummy Implementation).""" + + async def get(self, db: AsyncSession, id: int) -> Optional[models.Pipeline]: + """DUMMY: Get a Pipeline by ID.""" + logger.info(f"DUMMY CRUD: Simulating get Pipeline with id={id}") + # In real code: result = await db.execute(select(models.Pipeline).filter(models.Pipeline.id == id)) + # return result.scalars().first() + if id == 999: # Simulate not found + return None + # Return a dummy model instance + return models.Pipeline(id=id, name=f"Dummy Pipeline {id}", status=schemas.PipelineStatus.IDLE) + + async def get_multi(self, db: AsyncSession, skip: int = 0, limit: int = 100) -> Sequence[models.Pipeline]: + """DUMMY: Get multiple Pipelines.""" + logger.info(f"DUMMY CRUD: Simulating get_multi Pipeline skip={skip}, limit={limit}") + # In real code: result = await db.execute(select(models.Pipeline).offset(skip).limit(limit)) + # return result.scalars().all() + return [ + models.Pipeline(id=i, name=f"Dummy Pipeline {i}", status=schemas.PipelineStatus.IDLE) + for i in range(skip + 1, skip + limit + 1) + ] + + async def get_with_details(self, db: AsyncSession, id: int) -> Optional[models.Pipeline]: + """DUMMY: Get a Pipeline with related sources and runs.""" + logger.info(f"DUMMY CRUD: Simulating get_with_details Pipeline id={id}") + # In real code: Use eager loading + # stmt = select(models.Pipeline).options( + # selectinload(models.Pipeline.data_sources), + # selectinload(models.Pipeline.runs).order_by(models.PipelineRun.started_at.desc()).limit(5) # Example limit + # ).filter(models.Pipeline.id == id) + # result = await db.execute(stmt) + # return result.scalars().first() + pipeline = await self.get(db, id) + if pipeline: + pipeline.data_sources = [ + models.DataSource( + id=id * 10 + 1, + pipeline_id=id, + type=schemas.DataSourceType.URL, + config={"url": "http://dummy.example.com"}, + name="Dummy URL", + ), + models.DataSource( + id=id * 10 + 2, + pipeline_id=id, + type=schemas.DataSourceType.API, + config={"url": "http://dummy.api/data", "method": "GET"}, + name="Dummy API", + ), + ] + pipeline.runs = [ + models.PipelineRun( + id=id * 100 + 1, + pipeline_id=id, + status=schemas.PipelineStatus.COMPLETED, + started_at=datetime.utcnow(), + ), + models.PipelineRun( + id=id * 100 + 2, + pipeline_id=id, + status=schemas.PipelineStatus.FAILED, + started_at=datetime.utcnow() - timedelta(hours=1), + ), + ] + return pipeline + + async def create(self, db: AsyncSession, *, obj_in: schemas.PipelineCreate) -> models.Pipeline: + """DUMMY: Create a Pipeline.""" + logger.info(f"DUMMY CRUD: Simulating create Pipeline with data: {obj_in.model_dump()}") + # In real code: db_obj = models.Pipeline(**obj_in.model_dump()) + # db.add(db_obj); await db.flush(); await db.refresh(db_obj) + new_id = random.randint(100, 1000) + db_obj = models.Pipeline(id=new_id, status=schemas.PipelineStatus.IDLE, **obj_in.model_dump()) + logger.info(f"DUMMY CRUD: Simulated creation with id={new_id}") + return db_obj + + async def update( + self, db: AsyncSession, *, db_obj: models.Pipeline, obj_in: schemas.PipelineUpdate | dict + ) -> models.Pipeline: + """DUMMY: Update a Pipeline.""" + if isinstance(obj_in, dict): + update_data = obj_in + else: + update_data = obj_in.model_dump(exclude_unset=True) + logger.info(f"DUMMY CRUD: Simulating update Pipeline id={db_obj.id} with data: {update_data}") + # In real code: update object fields, add, flush, refresh + for field, value in update_data.items(): + if value is not None: # Apply updates + setattr(db_obj, field, value) + db_obj.updated_at = datetime.utcnow() # Simulate timestamp update + return db_obj + + async def remove(self, db: AsyncSession, *, id: int) -> Optional[models.Pipeline]: + """DUMMY: Remove a Pipeline.""" + logger.info(f"DUMMY CRUD: Simulating remove Pipeline id={id}") + obj = await self.get(db=db, id=id) + if obj: + logger.info(f"DUMMY CRUD: Found pipeline {id} to remove.") + # In real code: await db.delete(obj); await db.flush() + return obj # Return the simulated object to be deleted + logger.warning(f"DUMMY CRUD: Pipeline {id} not found for removal.") + return None + + +# Instantiate CRUD objects for pipelines, datasources, runs etc. +pipeline = CRUDPipeline() +# ... add dummy crud_datasource and crud_pipelinerun similar to above diff --git a/backend/app/main.py b/backend/app/main.py new file mode 100644 index 0000000..c26b5d2 --- /dev/null +++ b/backend/app/main.py @@ -0,0 +1,100 @@ +import logging +from contextlib import asynccontextmanager +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse +from app.core.config import settings +from app.api.v1.endpoints import api_router +from app.core.db import check_db_connection # Optional DB check + +# --- Logging Configuration --- +# Basic config, consider more advanced setup (JSON, handlers) for production +logging.basicConfig(level=settings.LOG_LEVEL.upper(), format='%(levelname)s: %(name)s - %(message)s') +logger = logging.getLogger(__name__) + + +# --- Lifespan Management --- +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup logic + logger.info("Application startup...") + # Example: Check DB connection (using dummy core.db function) + # if not await check_db_connection(): + # logger.critical("Database connection failed on startup. Check config/connections.") + # Decide if this is fatal. In dummy mode, we probably continue. + # sys.exit(1) # Or raise RuntimeError + + # Example: Placeholder for loading ML models, external resources, etc. + # app.state.ml_model = load_my_model() + logger.info("Dummy startup tasks complete.") + + yield # Application runs here + + # Shutdown logic + logger.info("Application shutdown...") + # Example: Clean up resources + # if hasattr(app.state, 'ml_model'): + # app.state.ml_model.cleanup() + # Optional: Dispose DB engine explicitly if needed (often handled by context managers) + # from app.core.db import engine + # if engine: await engine.dispose() + logger.info("Dummy shutdown tasks complete.") + + +# --- FastAPI Application Instance --- +app = FastAPI( + title=settings.PROJECT_NAME, + openapi_url=f"{settings.API_V1_STR}/openapi.json", + version="0.1.0", # Example version + description="Dummy API for Borbann Data Pipeline", + lifespan=lifespan, +) + + +# --- Global Exception Handler Example --- +@app.exception_handler(Exception) +async def generic_exception_handler(request: Request, exc: Exception): + # Log the full error internally + logger.error(f"Unhandled exception for request {request.url}: {exc}", exc_info=True) + # Return a generic error response to the client + return JSONResponse( + status_code=500, + content={"detail": "An internal server error occurred."}, + ) + + +@app.exception_handler(HTTPException) +async def http_exception_handler(request: Request, exc: HTTPException): + # Default handler for FastAPI's own HTTPExceptions + # You might want to log these as well, depending on the status code + logger.warning(f"HTTP Exception: Status={exc.status_code}, Detail={exc.detail}") + return JSONResponse( + status_code=exc.status_code, + content={"detail": exc.detail}, + headers=exc.headers, + ) + + +# --- Mount API Router --- +app.include_router(api_router, prefix=settings.API_V1_STR) + + +# --- Root Endpoint --- +@app.get("/", tags=["Root"], summary="Root endpoint") +async def read_root(): + """Simple root endpoint providing basic info.""" + return {"message": f"Welcome to {settings.PROJECT_NAME} (Dummy Version)"} + + +# --- Middleware (Example: CORS) --- +# from fastapi.middleware.cors import CORSMiddleware +# origins = [ +# "http://localhost:3000", # Allow frontend dev server +# # Add production frontend URL here +# ] +# app.add_middleware( +# CORSMiddleware, +# allow_origins=origins, +# allow_credentials=True, +# allow_methods=["*"], +# allow_headers=["*"], +# ) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py new file mode 100644 index 0000000..275ef75 --- /dev/null +++ b/backend/app/models/__init__.py @@ -0,0 +1,8 @@ +from app.core.db import Base + +# Import all models here to ensure they are registered with Base +from .pipeline import Pipeline, DataSource, PipelineRun, PipelineRunResult +from .news_article import NewsArticle + +# You can optionally define __all__ if needed +# __all__ = ["Base", "Pipeline", "DataSource", "NewsArticle", "PipelineRun", "PipelineRunResult"] diff --git a/backend/app/models/pipeline.py b/backend/app/models/pipeline.py new file mode 100644 index 0000000..2bfe4a4 --- /dev/null +++ b/backend/app/models/pipeline.py @@ -0,0 +1,82 @@ +import enum +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, Enum, ForeignKey, Text +from sqlalchemy.orm import relationship +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.sql import func +from app.core.db import Base # Import Base from your core db setup + + +# Define Enums directly here or import from schemas if preferred +# If defined here, ensure schemas.py uses these or compatible definitions +class PipelineStatusEnum(str, enum.Enum): + IDLE = "idle" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + PAUSED = "paused" + + +class DataSourceTypeEnum(str, enum.Enum): + URL = "url" + API = "api" + FILE = "file" + + +class Pipeline(Base): + __tablename__ = "pipelines" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String, index=True, nullable=False) + description = Column(String, nullable=True) + status = Column(Enum(PipelineStatusEnum), default=PipelineStatusEnum.IDLE, nullable=False) + schedule = Column(String, nullable=True, comment="Cron-like schedule format") + configuration = Column( + JSONB, nullable=True, default=dict, comment="Pipeline-specific config, e.g., processing rules" + ) + created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + # Relationships + data_sources = relationship("DataSource", back_populates="pipeline", cascade="all, delete-orphan", lazy="selectin") + runs = relationship("PipelineRun", back_populates="pipeline", cascade="all, delete-orphan", lazy="selectin") + + def __repr__(self) -> str: + return f"" + + +class DataSource(Base): + __tablename__ = "data_sources" + + id = Column(Integer, primary_key=True, index=True) + pipeline_id = Column(Integer, ForeignKey("pipelines.id"), nullable=False) + type = Column(Enum(DataSourceTypeEnum), nullable=False) + name = Column(String, nullable=True, comment="User-friendly name for the source") + config = Column(JSONB, nullable=False, comment="Source-specific config (url, api details, file path/info)") + created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + # Relationship + pipeline = relationship("Pipeline", back_populates="data_sources") + + def __repr__(self) -> str: + return f"" + + +class PipelineRun(Base): + __tablename__ = "pipeline_runs" + + id = Column(Integer, primary_key=True, index=True) + pipeline_id = Column(Integer, ForeignKey("pipelines.id"), nullable=False) + celery_task_id = Column(String, nullable=True, index=True, comment="Celery task ID for the main pipeline run") + status = Column(Enum(PipelineStatusEnum), default=PipelineStatusEnum.RUNNING, nullable=False) + started_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) + finished_at = Column(DateTime(timezone=True), nullable=True) + output_location = Column(String, nullable=True, comment="Path to results file or data store reference") + run_log = Column(Text, nullable=True, comment="Execution logs or error details") + + # Relationship + pipeline = relationship("Pipeline", back_populates="runs") + + def __repr__(self) -> str: + return f"" diff --git a/backend/app/schemas/__init__.py b/backend/app/schemas/__init__.py new file mode 100644 index 0000000..197f5b5 --- /dev/null +++ b/backend/app/schemas/__init__.py @@ -0,0 +1,18 @@ +# Import schemas for easier access +from .pipeline import ( + PipelineCreate, + PipelineUpdate, + PipelineRead, + PipelineReadWithSources, + PipelineRunRead, + DataSourceCreate, + DataSourceUpdate, + DataSourceRead, + DataSourceType, + PipelineStatus, +) +from .news_article import NewsArticleCreate, NewsArticleRead +from .job import JobStatus + +# Define __all__ for clarity if desired +# __all__ = [...] diff --git a/backend/app/schemas/pipeline.py b/backend/app/schemas/pipeline.py new file mode 100644 index 0000000..f0dc199 --- /dev/null +++ b/backend/app/schemas/pipeline.py @@ -0,0 +1,120 @@ +import enum +from datetime import datetime +from typing import Any, Dict, List, Optional +from pydantic import BaseModel, Field, HttpUrl, model_validator, ConfigDict + + +# Re-export enums from models or define compatible ones here +# Importing avoids definition duplication but creates dependency. Define here for clarity: +class PipelineStatus(str, enum.Enum): + IDLE = "idle" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + PAUSED = "paused" + + +class DataSourceType(str, enum.Enum): + URL = "url" + API = "api" + FILE = "file" + + +# --- Data Source Schemas --- +class DataSourceBase(BaseModel): + type: DataSourceType + name: Optional[str] = Field(None, max_length=255) + config: Dict[str, Any] = Field(..., description="Source-specific config") + + # Example validator based on type + @model_validator(mode='after') + def check_config_based_on_type(self) -> 'DataSourceBase': + config = self.config + type = self.type + if type == DataSourceType.URL: + if not config or 'url' not in config: + raise ValueError("URL config must contain 'url' key") + # Could add URL validation here + elif type == DataSourceType.API: + if not config or 'url' not in config or 'method' not in config: + raise ValueError("API config must contain 'url' and 'method' keys") + elif type == DataSourceType.FILE: + if not config or 'file_path' not in config: + raise ValueError("File config must contain 'file_path' key") + return self + + +class DataSourceCreate(DataSourceBase): + pipeline_id: int # Must be provided when creating standalone + + +class DataSourceUpdate(DataSourceBase): + # Make all fields optional for update + type: Optional[DataSourceType] = None + config: Optional[Dict[str, Any]] = None + + @model_validator(mode='before') + def prevent_type_change(cls, values: Dict[str, Any]) -> Dict[str, Any]: + # Example: Prevent changing the type during update + if 'type' in values and values['type'] is not None: + # In a real scenario, you'd compare against the existing db_obj type + # Here, we just disallow setting it in the update payload if not None + # raise ValueError("Changing data source type is not allowed.") + pass # Allow for dummy code, but keep validator structure + return values + + +class DataSourceRead(DataSourceBase): + id: int + pipeline_id: int + created_at: datetime + updated_at: Optional[datetime] = None + model_config = ConfigDict(from_attributes=True) + + +# --- Pipeline Run Schemas --- +class PipelineRunRead(BaseModel): + id: int + pipeline_id: int + celery_task_id: Optional[str] = None + status: PipelineStatus + started_at: datetime + finished_at: Optional[datetime] = None + output_location: Optional[str] = None + run_log: Optional[str] = Field(None, description="Execution logs, truncated if long") + model_config = ConfigDict(from_attributes=True) + + +# --- Pipeline Schemas --- +class PipelineBase(BaseModel): + name: str = Field(..., min_length=3, max_length=100) + description: Optional[str] = None + schedule: Optional[str] = Field(None, description="Cron-like schedule format, e.g., '0 * * * *'") + configuration: Optional[Dict[str, Any]] = Field(None, description="Pipeline-wide config") + + +class PipelineCreate(PipelineBase): + pass # Inherits all fields + + +class PipelineUpdate(BaseModel): + # Make all fields optional for update + name: Optional[str] = Field(None, min_length=3, max_length=100) + description: Optional[str] = None + schedule: Optional[str] = None + status: Optional[PipelineStatus] = None # Allow pausing/resuming via update + configuration: Optional[Dict[str, Any]] = None + + +class PipelineRead(PipelineBase): + id: int + status: PipelineStatus + # last_run_at: Optional[datetime] = None # Needs calculation/join, omit for simplicity + created_at: datetime + updated_at: Optional[datetime] = None + model_config = ConfigDict(from_attributes=True) + + +class PipelineReadWithDetails(PipelineRead): + data_sources: List[DataSourceRead] = [] + runs: List[PipelineRunRead] = Field([], description="Most recent runs") diff --git a/backend/app/services/__init__.py b/backend/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/services/processing_service.py b/backend/app/services/processing_service.py new file mode 100644 index 0000000..a8bf6da --- /dev/null +++ b/backend/app/services/processing_service.py @@ -0,0 +1,124 @@ +from datetime import datetime +import logging +from typing import Any, Dict, List, Optional +import pandas as pd +import asyncio +import random +from sqlalchemy.ext.asyncio import AsyncSession + +from app import crud, models, schemas # Import necessary types + +logger = logging.getLogger(__name__) + + +class ProcessingError(Exception): + """Custom exception for processing errors.""" + + pass + + +class ProcessingService: + """Service layer for handling data processing logic within a pipeline run.""" + + def __init__(self, db_session: Optional[AsyncSession] = None): + # Allow injecting session for testing or specific use cases, + # but typically services might not interact directly with db session. + # They usually call CRUD functions. + pass + + async def process_pipeline_results( + self, raw_results: List[List[Dict] | Dict | Exception], pipeline_config: Dict[str, Any] + ) -> pd.DataFrame: + """ + DUMMY: Orchestrates the processing of raw results from source tasks. + + Args: + raw_results: A list containing results (dicts or lists of dicts) + or Exceptions from individual source tasks. + pipeline_config: Configuration for the pipeline affecting processing. + + Returns: + A processed Pandas DataFrame. + + Raises: + ProcessingError: If a critical processing step fails. + """ + logger.info("DUMMY Service: Starting process_pipeline_results simulation") + + # 1. Aggregate valid data (handle errors from tasks) + valid_data = [] + errors_encountered = 0 + for i, result in enumerate(raw_results): + if isinstance(result, Exception): + errors_encountered += 1 + logger.warning(f"Task {i} resulted in error: {result}") + # Decide if errors should halt processing or just be logged + elif isinstance(result, list): + valid_data.extend(result) + elif isinstance(result, dict) and 'error' not in result: + valid_data.append(result) + # Ignore 'error' dicts or None results silently for this dummy example + + if not valid_data and errors_encountered > 0: + raise ProcessingError("No valid data received, only errors from source tasks.") + if not valid_data: + logger.warning("No valid data found to process.") + return pd.DataFrame() # Return empty DataFrame + + logger.info(f"Aggregated {len(valid_data)} raw records. Encountered {errors_encountered} errors.") + + # 2. Convert to DataFrame (simulate potential error) + try: + df = pd.DataFrame(valid_data).fillna("") # Handle potential missing keys + logger.info(f"Created initial DataFrame with shape: {df.shape}") + except Exception as e: + logger.error(f"Error creating DataFrame: {e}", exc_info=True) + raise ProcessingError("Failed to create DataFrame from aggregated results.") from e + + # 3. Simulate Processing Steps (using dummy functions) + await asyncio.sleep(random.uniform(0.01, 0.05)) # Simulate time + df = self._dummy_normalize(df, pipeline_config) + await asyncio.sleep(random.uniform(0.01, 0.05)) + df = self._dummy_deduplicate(df, pipeline_config) + await asyncio.sleep(random.uniform(0.01, 0.05)) + df = self._dummy_transform(df, pipeline_config) + + logger.info(f"DUMMY Service: Finished processing. Final DataFrame shape: {df.shape}") + return df + + def _dummy_normalize(self, df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame: + """DUMMY: Simulate schema normalization.""" + logger.debug(f"Simulating schema normalization on shape {df.shape}") + # Rename 'api_field' if it exists + if 'api_field' in df.columns: + df = df.rename(columns={'api_field': 'normalized_field'}) + return df.copy() + + def _dummy_deduplicate(self, df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame: + """DUMMY: Simulate deduplication.""" + logger.debug(f"Simulating deduplication on shape {df.shape}") + if df.empty: + return df + # Deduplicate based on a dummy 'source_url' or 'url' if present + key = 'source_url' if 'source_url' in df.columns else ('url' if 'url' in df.columns else None) + if key: + initial_count = len(df) + df_dedup = df.drop_duplicates(subset=[key], keep='first') + logger.debug(f"Dropped {initial_count - len(df_dedup)} duplicates based on '{key}'") + return df_dedup + return df.drop_duplicates(keep='first') # Fallback + + def _dummy_transform(self, df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame: + """DUMMY: Simulate data transformation.""" + logger.debug(f"Simulating transformations on shape {df.shape}") + # Add a dummy derived column if possible + if 'value' in df.columns: + # Ensure 'value' is numeric, coercing errors to NaN, then fillna + df['value'] = pd.to_numeric(df['value'], errors='coerce').fillna(0) + df['transformed_value'] = df['value'] * random.uniform(1.1, 1.5) + df['processing_timestamp'] = datetime.utcnow().isoformat() + return df + + +# Instantiate the service for potential use elsewhere (dependency injection) +processing_service = ProcessingService() diff --git a/backend/app/workers/__init__.py b/backend/app/workers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/workers/tasks.py b/backend/app/workers/tasks.py new file mode 100644 index 0000000..bb364a1 --- /dev/null +++ b/backend/app/workers/tasks.py @@ -0,0 +1,214 @@ +import asyncio +from datetime import datetime +import logging +import random +import time + +from celery import chord, group, shared_task + +from app import crud, models, schemas # Keep imports for structure +from app.core.db import AsyncSessionFactory # Use session factory directly in tasks +from app.services.processing_service import ProcessingService # Import dummy service + +logger = logging.getLogger(__name__) + + +# --- Helper to run async code from sync Celery tasks --- +def async_to_sync(awaitable): + """Runs an awaitable in a new event loop.""" + return asyncio.run(awaitable) + + +# --- Dummy Sub-Tasks --- +@shared_task(bind=True, max_retries=1, default_retry_delay=5) +def dummy_source_task(self, source_id: int, source_type: str): + """DUMMY: Simulates processing any data source type.""" + task_id = self.request.id + logger.info(f"DUMMY TASK dummy_source_task[ID:{task_id}]: Start DS:{source_id} Type:{source_type}") + await_time = random.uniform(0.05, 0.2) + time.sleep(await_time) # Simulate work + + # Simulate occasional failure + if random.random() < 0.08: + error_msg = f"Simulated failure processing source {source_id}" + logger.warning(f"DUMMY TASK dummy_source_task[ID:{task_id}]: {error_msg}") + raise ValueError(error_msg) # Raise exception for Celery retry/failure + + # Simulate successful result (list of dicts) + num_records = random.randint(1, 3) + result = [{f"data_{source_id}_{i}": random.random(), "source_type": source_type} for i in range(num_records)] + logger.info(f"DUMMY TASK dummy_source_task[ID:{task_id}]: Finish DS:{source_id}, generated {num_records} records.") + return result + + +# --- Dummy Aggregation Task (Callback) --- +@shared_task(bind=True) +def dummy_aggregate_task(self, results: list, pipeline_id: int, run_id: int): + """DUMMY: Simulates aggregating results and saving.""" + task_id = self.request.id + logger.info( + f"DUMMY TASK dummy_aggregate_task[ID:{task_id}]: Start Aggregation for RunID:{run_id}, PipelineID:{pipeline_id}. Received {len(results)} results." + ) + log_messages = [f"Aggregation simulation started at {datetime.utcnow()}"] + final_status = schemas.PipelineStatus.COMPLETED + output_location = None + errors_encountered = sum(1 for r in results if isinstance(r, Exception)) + + # Instantiate dummy service + service = ProcessingService() + + async def process_and_save(): + nonlocal output_location, final_status # Allow modification + try: + # Call dummy processing service + processed_df = await service.process_pipeline_results(results, {"dummy_pipeline_cfg": True}) + + if not processed_df.empty: + # Simulate saving (no actual file handler needed here for dummy) + await asyncio.sleep(0.1) # Simulate save time + output_location = f"dummy_outputs/run_{run_id}_output_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}.csv" + log_messages.append(f"Simulated saving results to {output_location}, shape: {processed_df.shape}") + logger.info(f"DUMMY AGGREGATION: Simulated save complete to {output_location}") + else: + log_messages.append("No data processed after aggregation/filtering.") + # Keep COMPLETED status if no errors, otherwise FAILED was set below + + except Exception as e: + logger.error(f"DUMMY AGGREGATION: Error during dummy processing: {e}", exc_info=True) + log_messages.append(f"ERROR during processing: {e}") + final_status = schemas.PipelineStatus.FAILED + output_location = None + + if errors_encountered > 0 and final_status != schemas.PipelineStatus.FAILED: + log_messages.append("Pipeline simulation completed with source task errors.") + # Optional: Set a specific status like COMPLETED_WITH_ERRORS if needed + elif errors_encountered > 0 and not processed_df.empty: + final_status = schemas.PipelineStatus.FAILED # Fail if errors and no data + + # Simulate DB Update + final_log = "\n".join(log_messages) + logger.info(f"DUMMY AGGREGATION: Simulating final DB update for RunID:{run_id} to status {final_status}") + if AsyncSessionFactory: # Check if DB is configured + async with AsyncSessionFactory() as session: + try: + # Call dummy CRUD functions + await crud.pipeline_run.update_run_status( + db=session, + run_id=run_id, + status=final_status, + output_location=output_location, + run_log=final_log, + ) + await crud.pipeline.update( # Use generic update for status + db=session, + db_obj=models.Pipeline(id=pipeline_id), # Need a dummy obj for update + obj_in={"status": schemas.PipelineStatus.IDLE}, + ) + logger.info(f"DUMMY AGGREGATION: DB update simulation successful for RunID:{run_id}.") + except Exception as db_exc: + logger.error( + f"DUMMY AGGREGATION: Failed DB update simulation for RunID:{run_id}: {db_exc}", exc_info=True + ) + else: + logger.warning("DUMMY AGGREGATION: Skipping DB update simulation as DB is not configured.") + + async_to_sync(process_and_save()) + logger.info(f"DUMMY TASK dummy_aggregate_task[ID:{task_id}]: Finish Aggregation Simulation for RunID:{run_id}") + + +# --- Dummy Pipeline Orchestrator Task --- +@shared_task(bind=True) +def run_pipeline_task(self, pipeline_id: int): + """DUMMY: Simulates fetching pipeline details and scheduling sub-tasks.""" + task_id = self.request.id + logger.info( + f"DUMMY TASK run_pipeline_task[ID:{task_id}]: Start Orchestration Simulation for PipelineID:{pipeline_id}" + ) + run_id = None + + async def setup_and_dispatch(): + nonlocal run_id + if not AsyncSessionFactory: + logger.error("Cannot simulate pipeline run: Database not configured.") + return None, "Database not configured" + + async with AsyncSessionFactory() as session: + # 1. Get Pipeline (dummy) + pipeline = await crud.pipeline.get_with_details(session, id=pipeline_id) + if not pipeline: + logger.error(f"Pipeline {pipeline_id} not found (simulated).") + return None, "Pipeline not found" + if pipeline.status != schemas.PipelineStatus.IDLE: + logger.warning(f"Pipeline {pipeline_id} not idle (status: {pipeline.status}), skipping run simulation.") + return None, f"Pipeline status is {pipeline.status}" + + # 2. Create Run Record (dummy) + run = await crud.pipeline_run.create( + session, pipeline_id=pipeline_id, celery_task_id=task_id, status=schemas.PipelineStatus.RUNNING + ) + run_id = run.id + logger.info(f"Created dummy PipelineRun record with RunID:{run_id}") + + # 3. Update Pipeline Status (dummy) + await crud.pipeline.update(session, db_obj=pipeline, obj_in={"status": schemas.PipelineStatus.RUNNING}) + logger.info(f"Set dummy Pipeline {pipeline_id} status to RUNNING") + + # 4. Prepare sub-tasks (using dummy sources from get_with_details) + if not pipeline.data_sources: + logger.warning(f"No data sources found for pipeline {pipeline_id}. Finishing run.") + await crud.pipeline_run.update_run_status( + session, run_id=run_id, status=schemas.PipelineStatus.COMPLETED, run_log="No data sources found." + ) + await crud.pipeline.update(session, db_obj=pipeline, obj_in={"status": schemas.PipelineStatus.IDLE}) + return [], None # No tasks to run + + sub_tasks = [dummy_source_task.s(ds.id, ds.type.value) for ds in pipeline.data_sources] + logger.info(f"Prepared {len(sub_tasks)} dummy sub-tasks for RunID:{run_id}") + return sub_tasks, None + + async def fail_run(error_message: str): + """Helper to mark run as failed if setup simulation fails.""" + if run_id and AsyncSessionFactory: + logger.error(f"Simulating run failure for RunID:{run_id} - {error_message}") + async with AsyncSessionFactory() as session: + await crud.pipeline_run.update_run_status( + db=session, + run_id=run_id, + status=schemas.PipelineStatus.FAILED, + run_log=f"Orchestration failed: {error_message}", + ) + await crud.pipeline.update( + db=session, db_obj=models.Pipeline(id=pipeline_id), obj_in={"status": schemas.PipelineStatus.IDLE} + ) + + try: + sub_task_signatures, error = async_to_sync(setup_and_dispatch()) + + if error: + logger.error(f"Orchestration setup simulation failed: {error}") + # fail_run should have been called if run_id was set + return + + if not sub_task_signatures: + logger.info("No sub-tasks to execute.") + return # Setup marked run as completed/failed + + # Define the workflow chord + workflow = chord( + header=group(sub_task_signatures), + body=dummy_aggregate_task.s(pipeline_id=pipeline_id, run_id=run_id), # Ensure run_id is passed + ) + + # Simulate applying the workflow + logger.info( + f"DUMMY TASK run_pipeline_task[ID:{task_id}]: Simulating Celery chord apply_async() for RunID:{run_id}" + ) + # In a real test you might call workflow() directly to execute synchronously + # For this dummy structure, just log the intent. + logger.info(f"DUMMY TASK run_pipeline_task[ID:{task_id}]: Workflow simulation scheduled for RunID:{run_id}") + + except Exception as exc: + logger.error( + f"DUMMY TASK run_pipeline_task[ID:{task_id}]: Orchestration Simulation FAILED: {exc}", exc_info=True + ) + async_to_sync(fail_run(f"Orchestration simulation exception: {type(exc).__name__}")) diff --git a/backend/pyproject.toml b/backend/pyproject.toml new file mode 100644 index 0000000..105bbf0 --- /dev/null +++ b/backend/pyproject.toml @@ -0,0 +1,34 @@ +# pyproject.toml +[project] +name = "backend" +version = "0.1.0" +description = "Customizable Automated Data Integration Pipeline Backend" +requires-python = ">=3.11" # Playwright and modern libraries benefit from newer Python +dependencies = [ + "fastapi", + "uvicorn[standard]", # Includes performance extras + "pydantic", + "pydantic-settings", + "sqlalchemy", # ORM + "psycopg2-binary", # Postgres driver (or asyncpg for async) + "asyncpg", # Async Postgres driver + "alembic", # Database migrations + "celery", # Background tasks + "redis", # Celery broker/backend + "playwright", # For self-hosted browser automation + "beautifulsoup4", # HTML parsing + "python-readability", # Clean HTML content extraction + "openai", # Or anthropic, google-generativeai for LLM + "pandas", # Data manipulation, file reading, export + "httpx", # Async HTTP requests (for APIs, LLM calls) + "python-multipart", # For FastAPI file uploads + "PyYAML", # For YAML export (if needed later) + "feedparser", # For parsing RSS/Atom feeds (News) + # Add other specific news API client libraries if needed +] + +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.ruff]