diff --git a/pipeline/ingestion/__init__.py b/pipeline/ingestion/__init__.py new file mode 100644 index 0000000..de20b6a --- /dev/null +++ b/pipeline/ingestion/__init__.py @@ -0,0 +1,15 @@ +from .core import Ingestor +from .ingestors import IngestionMethod, SimpleIngestionStrategy, MLIngestionStrategy +from .adapters.api_adapter import ApiAdapter +from .adapters.file_adapter import FileAdapter +from .adapters.web_scraper_adapter import WebScraperAdapter + +__all__ = [ + "Ingestor", + "ApiAdapter", + "FileAdapter", + "WebScraperAdapter", + "IngestionMethod", + "SimpleIngestionStrategy", + "MLIngestionStrategy", +] diff --git a/pipeline/ingestion/core.py b/pipeline/ingestion/core.py new file mode 100644 index 0000000..e7cc6fd --- /dev/null +++ b/pipeline/ingestion/core.py @@ -0,0 +1,24 @@ +from ingestors import IngestionMethod, SimpleIngestionStrategy, MLIngestionStrategy +from models.ingestion import IngestSourceConfig, OutputData + + +class Ingestor: + """ + Ingestor for aggregating data using different strategies. + + Args: + sources (list[IngestSourceConfig]): List of sources to ingest. + strategy (str, optional): Strategy to use for ingestion [simple, ml]. Defaults to "simple". + """ + + @staticmethod + def run(sources: list[IngestSourceConfig], strategy: str = "simple") -> OutputData: + strategies: dict[str, IngestionMethod] = { + "simple": SimpleIngestionStrategy(), + "ml": MLIngestionStrategy(), + } + + if strategy not in strategies: + raise ValueError(f"Unsupported strategy: {strategy}") + + return strategies[strategy].run(sources) diff --git a/pipeline/ingestion/ingestor.py b/pipeline/ingestion/ingestor.py deleted file mode 100644 index bc151cb..0000000 --- a/pipeline/ingestion/ingestor.py +++ /dev/null @@ -1,112 +0,0 @@ -""" -Ingestor module to orchestrate data ingestion from multiple adapters. -""" - -from typing import List, Dict, Any - -from ingestion.adapters.api_adapter import ApiAdapter -from ingestion.adapters.file_adapter import FileAdapter -from ingestion.adapters.web_scraper_adapter import WebScraperAdapter -from pydantic import BaseModel -from loguru import logger - - -class Ingestor: - """ - Ingestor for aggregating data from various sources. - """ - - @staticmethod - def run(sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """ - Run ingestion for a list of sources. - - Args: - sources: List of dicts, each with: - - type: 'api', 'scrape', or 'file' - - config: kwargs for the adapter constructor - - Returns: - Flat list of all records fetched from sources. - - Raises: - ValueError: For unknown source types. - RuntimeError: If an adapter fails during fetch. - """ - from log.logging_utils import pipeline_log - aggregated: List[Dict[str, Any]] = [] - logger.info("Starting ingestion run for sources.") - - for src in sources: - # accept Pydantic models or raw dicts - if isinstance(src, BaseModel): - src_item = src.dict() - else: - src_item = src - src_type = src_item.get("type") - config = src_item.get("config", {}) - # convert BaseModel config to dict if needed - if not isinstance(config, dict) and hasattr(config, "dict"): - config = config.dict(exclude_unset=True) - pipeline_id = config.get("pipeline_id") or src_item.get("pipeline_id") - run_id = config.get("run_id") or src_item.get("run_id") - logger.info(f"Processing source type: {src_type} with config: {config}") - if src_type == "api": - adapter = ApiAdapter(**config) - elif src_type == "scrape": - adapter = WebScraperAdapter(**config) - elif src_type == "file": - adapter = FileAdapter(**config) - else: - logger.error(f"Unknown source type: {src_type}") - pipeline_log("ERROR", f"Unknown source type: {src_type}", pipeline_id, run_id, status="FAILED") - raise ValueError(f"Unknown source type: {src_type}") - - try: - logger.info(f"Fetching records using {src_type} adapter.") - records = adapter.fetch() - logger.info(f"Fetched {len(records)} records from {src_type} source.") - aggregated.extend(records) - pipeline_log("SUCCESS", f"Fetched {len(records)} records.", pipeline_id, run_id, status="COMPLETED") - except Exception as e: - logger.error(f"Fetch failed for source {src_type}: {e}") - pipeline_log("ERROR", f"Fetch failed: {e}", pipeline_id, run_id, status="FAILED") - raise RuntimeError(f"Fetch failed for source {src_type}: {e}") - - logger.info(f"Ingestion run completed. Total records aggregated: {len(aggregated)}") - return aggregated - - -if __name__ == "__main__": - # Example usage of the Ingestor. - example_sources = [ - { - "type": "api", - "config": { - "url": "https://dummyjson.com/products", - "headers": {"Accept": "application/json"}, - }, - }, - { - "type": "file", - "config": {"path": "data/sample.json"}, - }, - { - "type": "scrape", - "config": { - "urls": ["https://www.hipflat.co.th/en"], - "schema_file": None, - "prompt": "Extract all listings", - "llm_provider": "gemini/gemini-2.0-flash", - "api_key": "AIzaSyAGnER5on8a0bVXU7quXFMnNyOvCiC_ees", - "output_format": "json", - "verbose": False, - "cache_mode": "ENABLED", - }, - }, - ] - - records = Ingestor.run(example_sources) - print(f"Total records ingested: {len(records)}") - for record in records: - print(record) \ No newline at end of file diff --git a/pipeline/ingestion/ingestors/__init__.py b/pipeline/ingestion/ingestors/__init__.py new file mode 100644 index 0000000..2793947 --- /dev/null +++ b/pipeline/ingestion/ingestors/__init__.py @@ -0,0 +1,5 @@ +from .simple_ingest import SimpleIngestionStrategy +from .mapping_ingest import MLIngestionStrategy +from .base import IngestionMethod + +__all__ = ["SimpleIngestionStrategy", "MLIngestionStrategy", "IngestionMethod"] diff --git a/pipeline/ingestion/ingestors/base.py b/pipeline/ingestion/ingestors/base.py new file mode 100644 index 0000000..c4f5ff3 --- /dev/null +++ b/pipeline/ingestion/ingestors/base.py @@ -0,0 +1,8 @@ +from abc import ABC, abstractmethod +from models.ingestion import IngestSourceConfig, OutputData + + +class IngestionMethod(ABC): + @abstractmethod + def run(self, sources: list[IngestSourceConfig]) -> OutputData: + pass diff --git a/pipeline/ingestion/ingestors/mapping_ingest.py b/pipeline/ingestion/ingestors/mapping_ingest.py new file mode 100644 index 0000000..e67bac1 --- /dev/null +++ b/pipeline/ingestion/ingestors/mapping_ingest.py @@ -0,0 +1,12 @@ +from .base import IngestionMethod +from models.ingestion import IngestSourceConfig, OutputData + + +class MLIngestionStrategy(IngestionMethod): + def run(self, sources: list[IngestSourceConfig]) -> OutputData: + # TODO: Add ML-based logic (e.g., deduplication, entity linking, classification) + return OutputData( + records=[], # Placeholder + unified=True, + metadata={"message": "ML strategy not implemented yet"}, + ) diff --git a/pipeline/ingestion/ingestors/simple_ingest.py b/pipeline/ingestion/ingestors/simple_ingest.py new file mode 100644 index 0000000..9e361fb --- /dev/null +++ b/pipeline/ingestion/ingestors/simple_ingest.py @@ -0,0 +1,65 @@ +from ingestion.adapters.api_adapter import ApiAdapter +from ingestion.adapters.file_adapter import FileAdapter +from ingestion.adapters.web_scraper_adapter import WebScraperAdapter +from .base import IngestionMethod +from models.ingestion import ( + AdapterRecord, + IngestSourceConfig, + SourceType, + ApiConfig, + FileConfig, + ScrapeConfig, + OutputData, +) +from loguru import logger + + +class SimpleIngestionStrategy(IngestionMethod): + def run(self, sources: list[IngestSourceConfig]) -> OutputData: + results: list[AdapterRecord] = [] + + for source in sources: + try: + match source.type: + case SourceType.API: + config = source.config + assert isinstance(config, ApiConfig) + adapter = ApiAdapter( + url=config.url, + headers=config.headers, + timeout=config.timeout or 30, + token=config.token, + ) + records = adapter.fetch() + + case SourceType.FILE: + config = source.config + assert isinstance(config, FileConfig) + adapter = FileAdapter(upload=config.upload) + records = adapter.fetch() + + case SourceType.SCRAPE: + config = source.config + assert isinstance(config, ScrapeConfig) + adapter = WebScraperAdapter( + urls=config.urls, + api_key=config.api_key, + schema_file=config.schema_file, + prompt=config.prompt or WebScraperAdapter.DEFAULT_PROMPT, + llm_provider=config.llm_provider or "openai/gpt-4o-mini", + output_format=config.output_format or "json", + verbose=config.verbose or False, + cache_mode=config.cache_mode or "ENABLED", + ) + records = adapter.fetch() + + results.extend(records) + + except Exception as e: + logger.error(f"Failed to ingest from source {source.type}: {e}") + + return OutputData( + records=results, + unified=False, + metadata={"source_count": len(sources), "record_count": len(results)}, + ) diff --git a/pipeline/models/ingestion.py b/pipeline/models/ingestion.py index 1f2c8dd..8fbf4e1 100644 --- a/pipeline/models/ingestion.py +++ b/pipeline/models/ingestion.py @@ -1,7 +1,12 @@ +import enum from typing import Any +from fastapi import UploadFile from pydantic import BaseModel, Field +# ------ Adapter Model ------ + + class AdapterRecord(BaseModel): """ Record output from each adapter. @@ -23,3 +28,60 @@ class OutputData(BaseModel): metadata: dict[str, Any] | None = Field( default=None, description="Metadata about the run" ) + + +# ------------------------------------ + +# ------ Ingestor Model ------ + + +class SourceType(str, enum.Enum): + API = "api" + FILE = "file" + SCRAPE = "scrape" + + +class ApiConfig(BaseModel): + url: str + headers: dict[str, str] | None = None + timeout: int | None = None + token: str | None = None + + +class FileConfig(BaseModel): + upload: UploadFile + + +class ScrapeConfig(BaseModel): + urls: list[str] + api_key: str + schema_file: str | None = None + prompt: str | None = None + llm_provider: str | None = None + output_format: str | None = None + verbose: bool | None = None + cache_mode: str | None = None + + +class IngestSourceConfig(BaseModel): + """ + Configuration for a single ingestion source, to be used by the Ingestor. + The 'type' field selects the adapter ('api', 'file', or 'scrape'). + The 'config' field contains the adapter-specific configuration. + """ + + type: SourceType = Field(..., description="Source type: 'api', 'file', or 'scrape'") + config: ApiConfig | FileConfig | ScrapeConfig = Field( + ..., description="Configuration for the adapter" + ) + + +class IngestorInput(BaseModel): + """ + Input for the ingestor. + """ + + sources: list[IngestSourceConfig] + + +# ------------------------------------