refactor: correct ingestor with new adapter - define export module list

This commit is contained in:
Sosokker 2025-05-12 17:05:13 +07:00
parent eae46ac820
commit ff9de9f80a
8 changed files with 191 additions and 112 deletions

View File

@ -0,0 +1,15 @@
from .core import Ingestor
from .ingestors import IngestionMethod, SimpleIngestionStrategy, MLIngestionStrategy
from .adapters.api_adapter import ApiAdapter
from .adapters.file_adapter import FileAdapter
from .adapters.web_scraper_adapter import WebScraperAdapter
__all__ = [
"Ingestor",
"ApiAdapter",
"FileAdapter",
"WebScraperAdapter",
"IngestionMethod",
"SimpleIngestionStrategy",
"MLIngestionStrategy",
]

View File

@ -0,0 +1,24 @@
from ingestors import IngestionMethod, SimpleIngestionStrategy, MLIngestionStrategy
from models.ingestion import IngestSourceConfig, OutputData
class Ingestor:
"""
Ingestor for aggregating data using different strategies.
Args:
sources (list[IngestSourceConfig]): List of sources to ingest.
strategy (str, optional): Strategy to use for ingestion [simple, ml]. Defaults to "simple".
"""
@staticmethod
def run(sources: list[IngestSourceConfig], strategy: str = "simple") -> OutputData:
strategies: dict[str, IngestionMethod] = {
"simple": SimpleIngestionStrategy(),
"ml": MLIngestionStrategy(),
}
if strategy not in strategies:
raise ValueError(f"Unsupported strategy: {strategy}")
return strategies[strategy].run(sources)

View File

@ -1,112 +0,0 @@
"""
Ingestor module to orchestrate data ingestion from multiple adapters.
"""
from typing import List, Dict, Any
from ingestion.adapters.api_adapter import ApiAdapter
from ingestion.adapters.file_adapter import FileAdapter
from ingestion.adapters.web_scraper_adapter import WebScraperAdapter
from pydantic import BaseModel
from loguru import logger
class Ingestor:
"""
Ingestor for aggregating data from various sources.
"""
@staticmethod
def run(sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Run ingestion for a list of sources.
Args:
sources: List of dicts, each with:
- type: 'api', 'scrape', or 'file'
- config: kwargs for the adapter constructor
Returns:
Flat list of all records fetched from sources.
Raises:
ValueError: For unknown source types.
RuntimeError: If an adapter fails during fetch.
"""
from log.logging_utils import pipeline_log
aggregated: List[Dict[str, Any]] = []
logger.info("Starting ingestion run for sources.")
for src in sources:
# accept Pydantic models or raw dicts
if isinstance(src, BaseModel):
src_item = src.dict()
else:
src_item = src
src_type = src_item.get("type")
config = src_item.get("config", {})
# convert BaseModel config to dict if needed
if not isinstance(config, dict) and hasattr(config, "dict"):
config = config.dict(exclude_unset=True)
pipeline_id = config.get("pipeline_id") or src_item.get("pipeline_id")
run_id = config.get("run_id") or src_item.get("run_id")
logger.info(f"Processing source type: {src_type} with config: {config}")
if src_type == "api":
adapter = ApiAdapter(**config)
elif src_type == "scrape":
adapter = WebScraperAdapter(**config)
elif src_type == "file":
adapter = FileAdapter(**config)
else:
logger.error(f"Unknown source type: {src_type}")
pipeline_log("ERROR", f"Unknown source type: {src_type}", pipeline_id, run_id, status="FAILED")
raise ValueError(f"Unknown source type: {src_type}")
try:
logger.info(f"Fetching records using {src_type} adapter.")
records = adapter.fetch()
logger.info(f"Fetched {len(records)} records from {src_type} source.")
aggregated.extend(records)
pipeline_log("SUCCESS", f"Fetched {len(records)} records.", pipeline_id, run_id, status="COMPLETED")
except Exception as e:
logger.error(f"Fetch failed for source {src_type}: {e}")
pipeline_log("ERROR", f"Fetch failed: {e}", pipeline_id, run_id, status="FAILED")
raise RuntimeError(f"Fetch failed for source {src_type}: {e}")
logger.info(f"Ingestion run completed. Total records aggregated: {len(aggregated)}")
return aggregated
if __name__ == "__main__":
# Example usage of the Ingestor.
example_sources = [
{
"type": "api",
"config": {
"url": "https://dummyjson.com/products",
"headers": {"Accept": "application/json"},
},
},
{
"type": "file",
"config": {"path": "data/sample.json"},
},
{
"type": "scrape",
"config": {
"urls": ["https://www.hipflat.co.th/en"],
"schema_file": None,
"prompt": "Extract all listings",
"llm_provider": "gemini/gemini-2.0-flash",
"api_key": "AIzaSyAGnER5on8a0bVXU7quXFMnNyOvCiC_ees",
"output_format": "json",
"verbose": False,
"cache_mode": "ENABLED",
},
},
]
records = Ingestor.run(example_sources)
print(f"Total records ingested: {len(records)}")
for record in records:
print(record)

View File

@ -0,0 +1,5 @@
from .simple_ingest import SimpleIngestionStrategy
from .mapping_ingest import MLIngestionStrategy
from .base import IngestionMethod
__all__ = ["SimpleIngestionStrategy", "MLIngestionStrategy", "IngestionMethod"]

View File

@ -0,0 +1,8 @@
from abc import ABC, abstractmethod
from models.ingestion import IngestSourceConfig, OutputData
class IngestionMethod(ABC):
@abstractmethod
def run(self, sources: list[IngestSourceConfig]) -> OutputData:
pass

View File

@ -0,0 +1,12 @@
from .base import IngestionMethod
from models.ingestion import IngestSourceConfig, OutputData
class MLIngestionStrategy(IngestionMethod):
def run(self, sources: list[IngestSourceConfig]) -> OutputData:
# TODO: Add ML-based logic (e.g., deduplication, entity linking, classification)
return OutputData(
records=[], # Placeholder
unified=True,
metadata={"message": "ML strategy not implemented yet"},
)

View File

@ -0,0 +1,65 @@
from ingestion.adapters.api_adapter import ApiAdapter
from ingestion.adapters.file_adapter import FileAdapter
from ingestion.adapters.web_scraper_adapter import WebScraperAdapter
from .base import IngestionMethod
from models.ingestion import (
AdapterRecord,
IngestSourceConfig,
SourceType,
ApiConfig,
FileConfig,
ScrapeConfig,
OutputData,
)
from loguru import logger
class SimpleIngestionStrategy(IngestionMethod):
def run(self, sources: list[IngestSourceConfig]) -> OutputData:
results: list[AdapterRecord] = []
for source in sources:
try:
match source.type:
case SourceType.API:
config = source.config
assert isinstance(config, ApiConfig)
adapter = ApiAdapter(
url=config.url,
headers=config.headers,
timeout=config.timeout or 30,
token=config.token,
)
records = adapter.fetch()
case SourceType.FILE:
config = source.config
assert isinstance(config, FileConfig)
adapter = FileAdapter(upload=config.upload)
records = adapter.fetch()
case SourceType.SCRAPE:
config = source.config
assert isinstance(config, ScrapeConfig)
adapter = WebScraperAdapter(
urls=config.urls,
api_key=config.api_key,
schema_file=config.schema_file,
prompt=config.prompt or WebScraperAdapter.DEFAULT_PROMPT,
llm_provider=config.llm_provider or "openai/gpt-4o-mini",
output_format=config.output_format or "json",
verbose=config.verbose or False,
cache_mode=config.cache_mode or "ENABLED",
)
records = adapter.fetch()
results.extend(records)
except Exception as e:
logger.error(f"Failed to ingest from source {source.type}: {e}")
return OutputData(
records=results,
unified=False,
metadata={"source_count": len(sources), "record_count": len(results)},
)

View File

@ -1,7 +1,12 @@
import enum
from typing import Any
from fastapi import UploadFile
from pydantic import BaseModel, Field
# ------ Adapter Model ------
class AdapterRecord(BaseModel):
"""
Record output from each adapter.
@ -23,3 +28,60 @@ class OutputData(BaseModel):
metadata: dict[str, Any] | None = Field(
default=None, description="Metadata about the run"
)
# ------------------------------------
# ------ Ingestor Model ------
class SourceType(str, enum.Enum):
API = "api"
FILE = "file"
SCRAPE = "scrape"
class ApiConfig(BaseModel):
url: str
headers: dict[str, str] | None = None
timeout: int | None = None
token: str | None = None
class FileConfig(BaseModel):
upload: UploadFile
class ScrapeConfig(BaseModel):
urls: list[str]
api_key: str
schema_file: str | None = None
prompt: str | None = None
llm_provider: str | None = None
output_format: str | None = None
verbose: bool | None = None
cache_mode: str | None = None
class IngestSourceConfig(BaseModel):
"""
Configuration for a single ingestion source, to be used by the Ingestor.
The 'type' field selects the adapter ('api', 'file', or 'scrape').
The 'config' field contains the adapter-specific configuration.
"""
type: SourceType = Field(..., description="Source type: 'api', 'file', or 'scrape'")
config: ApiConfig | FileConfig | ScrapeConfig = Field(
..., description="Configuration for the adapter"
)
class IngestorInput(BaseModel):
"""
Input for the ingestor.
"""
sources: list[IngestSourceConfig]
# ------------------------------------