initial commit

This commit is contained in:
Sosokker 2025-04-20 19:46:54 +07:00
commit 10856f6cdf
20 changed files with 3105 additions and 0 deletions

14
.gitignore vendored Normal file
View File

@ -0,0 +1,14 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
.env
/ingestion/data

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.12

0
README.md Normal file
View File

85
config.py Normal file
View File

@ -0,0 +1,85 @@
# config.py
import os
from dotenv import load_dotenv
from pathlib import Path
# Load environment variables from .env file located in the script's directory
# Make sure .env is in the *same directory* as config.py
dotenv_path = Path(__file__).parent / '.env'
if dotenv_path.is_file():
load_dotenv(dotenv_path=dotenv_path)
else:
print(f"Warning: .env file not found at {dotenv_path}")
# --- Default Settings ---
DEFAULT_OUTPUT_FILE = "extracted_data.json"
DEFAULT_OUTPUT_FORMAT = "json" # csv, json, sqlite
DEFAULT_CACHE_MODE = "ENABLED" # ENABLED, BYPASS, DISABLED, READ_ONLY, WRITE_ONLY
DEFAULT_VERBOSE = False
DEFAULT_LLM_PROVIDER = "openai/gpt-4o-mini" # Default LLM
# --- LLM Provider Configuration ---
PROVIDER_ENV_MAP = {
"openai": "OPENAI_API_KEY",
"gemini": "GEMINI_API_KEY",
"groq": "GROQ_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"ollama": None, # Ollama typically doesn't require an API key
# Add other providers and their corresponding env variable names here
}
def get_api_key_env_name(provider: str) -> str | None:
"""Gets the expected environment variable name for the given provider."""
provider_prefix = provider.split('/')[0].lower()
return PROVIDER_ENV_MAP.get(provider_prefix)
def get_api_key(provider: str, direct_key: str | None = None, env_var_name: str | None = None) -> str | None:
"""
Retrieves the API key for a given provider.
Priority: direct_key > env_var_name > default env var from PROVIDER_ENV_MAP.
"""
if direct_key:
print(f"INFO: Using direct API key provided via --api-key for provider '{provider}'.")
return direct_key
if env_var_name:
key = os.getenv(env_var_name)
if key:
print(
f"INFO: Using API key from specified environment variable '{env_var_name}' for provider '{provider}'."
)
return key
else:
print(f"Warning: Specified environment variable '{env_var_name}' not found.")
default_env_name = get_api_key_env_name(provider)
if default_env_name:
key = os.getenv(default_env_name)
if key:
print(
f"INFO: Using API key from default environment variable '{default_env_name}' for provider '{provider}'."
)
return key
else:
if default_env_name is not None: # Don't warn if provider like Ollama has None mapping
print(
f"Warning: Default environment variable '{default_env_name}' for provider '{provider}' not found."
)
return None
# If provider is not in map and no key was provided
# Allow providers like 'ollama' to proceed without a key
if provider.split('/')[0].lower() != "ollama":
print(f"Warning: No API key found or specified for provider '{provider}'. LLM features might fail.")
return None
# --- Exportable Configuration Variables ---
LLM_PROVIDER = os.getenv("DEFAULT_LLM_PROVIDER", DEFAULT_LLM_PROVIDER)
OUTPUT_FILE = os.getenv("DEFAULT_OUTPUT_FILE", DEFAULT_OUTPUT_FILE)
OUTPUT_FORMAT = os.getenv("DEFAULT_OUTPUT_FORMAT", DEFAULT_OUTPUT_FORMAT)
CACHE_MODE = os.getenv("DEFAULT_CACHE_MODE", DEFAULT_CACHE_MODE)
VERBOSE = os.getenv("DEFAULT_VERBOSE", str(DEFAULT_VERBOSE)).lower() in ('true', '1', 't')

View File

@ -0,0 +1,3 @@
"""
Adapters package for the ingestion layer.
"""

View File

@ -0,0 +1,81 @@
"""
API adapter to fetch JSON data from HTTP endpoints.
"""
from typing import List, Dict, Any, Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .base import DataSourceAdapter
class ApiAdapter(DataSourceAdapter):
"""
Adapter for fetching data from a REST API endpoint.
"""
def __init__(
self,
url: str,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30
):
"""
Initialize the API adapter.
Args:
url: Endpoint URL to fetch.
headers: Optional HTTP headers.
timeout: Timeout in seconds for the request.
"""
self.url = url
self.headers = headers or {}
self.timeout = timeout
self.session = self._init_session()
def _init_session(self) -> requests.Session:
"""
Initialize a requests.Session with retry logic.
"""
session = requests.Session()
retries = Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def fetch(self) -> List[Dict[str, Any]]:
"""
Perform a GET request and return JSON data as a list of records.
Returns:
List of dicts from the JSON response.
Raises:
RuntimeError: On network error, HTTP error, or JSON parse error.
"""
try:
response = self.session.get(
self.url, headers=self.headers, timeout=self.timeout
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise RuntimeError(f"API request failed: {e}")
try:
data = response.json()
except ValueError as e:
raise RuntimeError(f"Failed to parse JSON response: {e}")
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
raise RuntimeError("Unexpected JSON structure: expected list or dict.")

View File

@ -0,0 +1,20 @@
"""
Define the DataSourceAdapter protocol for ingestion adapters.
"""
from typing import Protocol, List, Dict, Any
class DataSourceAdapter(Protocol):
"""
Protocol for data source adapters.
"""
def fetch(self) -> List[Dict[str, Any]]:
"""
Fetch data from the source.
Returns:
A list of records, each represented as a dict.
"""
...

View File

@ -0,0 +1,61 @@
"""
File adapter to load data from CSV or JSON files.
"""
from typing import List, Dict, Any
import json
import pandas as pd
from .base import DataSourceAdapter
class FileAdapter(DataSourceAdapter):
"""
Adapter for reading data from local files (CSV or JSON).
"""
def __init__(self, path: str):
"""
Initialize the file adapter.
Args:
path: Path to the input file (.csv or .json).
"""
self.path = path
def fetch(self) -> List[Dict[str, Any]]:
"""
Read and parse the file, returning a list of records.
Returns:
List of dicts from the file contents.
Raises:
RuntimeError: On read or parse errors.
ValueError: If file extension is unsupported.
"""
p = self.path.lower()
if p.endswith(".csv"):
try:
df = pd.read_csv(self.path)
return df.to_dict(orient="records")
except Exception as e:
raise RuntimeError(f"Failed to read CSV '{self.path}': {e}")
if p.endswith(".json"):
try:
with open(self.path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
raise RuntimeError(
f"JSON file '{self.path}' does not contain a list or dict."
)
except Exception as e:
raise RuntimeError(f"Failed to read JSON '{self.path}': {e}")
raise ValueError(
f"Unsupported file extension for '{self.path}'. "
"Only .csv and .json are supported."
)

View File

@ -0,0 +1,154 @@
"""
Web scraper adapter using crawl4ai to extract structured data.
"""
import asyncio
import json
from typing import List, Dict, Any, Optional
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
CacheMode,
LLMConfig,
CrawlResult,
)
from crawl4ai.extraction_strategy import (
JsonCssExtractionStrategy,
LLMExtractionStrategy,
ExtractionStrategy,
)
from .base import DataSourceAdapter
class WebScraperAdapter(DataSourceAdapter):
"""
Adapter for web scraping using crawl4ai.
"""
def __init__(
self,
urls: List[str],
schema_file: Optional[str] = None,
prompt: Optional[str] = None,
llm_provider: str = "openai/gpt-4",
api_key: Optional[str] = None,
output_format: str = "json",
verbose: bool = False,
cache_mode: str = "ENABLED",
):
"""
Initialize the scraper adapter.
Args:
urls: List of URLs to scrape.
schema_file: Path to a JSON file with CSS extraction schema.
prompt: Prompt for LLM-based extraction.
llm_provider: LLM provider identifier.
api_key: API key for the LLM provider.
output_format: Desired format for the extracted data.
verbose: Enable verbose logging.
cache_mode: Crawl cache mode (e.g., 'ENABLED').
"""
self.urls = urls
self.schema_file = schema_file
self.prompt = prompt
self.llm_provider = llm_provider
self.api_key = api_key
self.output_format = output_format
self.verbose = verbose
self.cache_mode = cache_mode
def fetch(self) -> List[Dict[str, Any]]:
"""
Synchronously fetch data by running the async crawler.
Returns:
List of extracted records.
Raises:
RuntimeError: On failure during crawling or extraction.
"""
try:
return asyncio.run(self._fetch_async())
except Exception as e:
raise RuntimeError(f"Web scraping failed: {e}")
async def _fetch_async(self) -> List[Dict[str, Any]]:
"""
Internal async method to perform crawling and extraction.
"""
# Initialize crawler
browser_cfg = BrowserConfig(headless=True, verbose=self.verbose)
crawler = AsyncWebCrawler(config=browser_cfg)
await crawler.start()
# Prepare extraction strategy
llm_cfg = LLMConfig(provider=self.llm_provider, api_token=self.api_key)
extraction_strategy: Optional[ExtractionStrategy] = None
if self.schema_file:
try:
with open(self.schema_file, "r", encoding="utf-8") as f:
schema = json.load(f)
extraction_strategy = JsonCssExtractionStrategy(
schema=schema, verbose=self.verbose
)
except Exception as e:
await crawler.close()
raise RuntimeError(
f"Failed to load schema file '{self.schema_file}': {e}"
)
elif self.prompt:
extraction_strategy = LLMExtractionStrategy(
llm_config=llm_cfg,
instruction=self.prompt,
extraction_type="schema",
apply_chunking=True,
verbose=self.verbose,
)
else:
await crawler.close()
raise ValueError("Either 'schema_file' or 'prompt' must be provided.")
# Configure cache mode
try:
cache_enum = getattr(CacheMode, self.cache_mode.upper())
except AttributeError:
cache_enum = CacheMode.ENABLED
run_cfg = CrawlerRunConfig(
cache_mode=cache_enum,
extraction_strategy=extraction_strategy,
verbose=self.verbose,
)
# Execute crawl
try:
results: List[CrawlResult] = await crawler.arun_many(
urls=self.urls, config=run_cfg
)
finally:
await crawler.close()
# Process crawl results
records: List[Dict[str, Any]] = []
for res in results:
if not res.success or not res.extracted_content:
continue
try:
content = json.loads(res.extracted_content)
except Exception:
continue
if isinstance(content, list):
for item in content:
if isinstance(item, dict):
item["source_url"] = res.url
records.extend(content)
elif isinstance(content, dict):
content["source_url"] = res.url
records.append(content)
return records

91
ingestion/ingestor.py Normal file
View File

@ -0,0 +1,91 @@
"""
Ingestor module to orchestrate data ingestion from multiple adapters.
"""
from typing import List, Dict, Any
from ingestion.adapters.api_adapter import ApiAdapter
from ingestion.adapters.file_adapter import FileAdapter
from ingestion.adapters.web_scraper_adapter import WebScraperAdapter
class Ingestor:
"""
Ingestor for aggregating data from various sources.
"""
@staticmethod
def run(sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Run ingestion for a list of sources.
Args:
sources: List of dicts, each with:
- type: 'api', 'scrape', or 'file'
- config: kwargs for the adapter constructor
Returns:
Flat list of all records fetched from sources.
Raises:
ValueError: For unknown source types.
RuntimeError: If an adapter fails during fetch.
"""
aggregated: List[Dict[str, Any]] = []
for src in sources:
src_type = src.get("type")
config = src.get("config", {})
if src_type == "api":
adapter = ApiAdapter(**config)
elif src_type == "scrape":
adapter = WebScraperAdapter(**config)
elif src_type == "file":
adapter = FileAdapter(**config)
else:
raise ValueError(f"Unknown source type: {src_type}")
try:
data = adapter.fetch()
aggregated.extend(data)
except Exception as e:
raise RuntimeError(
f"Ingestion failed for source '{src_type}' with config {config}: {e}"
)
return aggregated
if __name__ == "__main__":
# Example usage of the Ingestor.
example_sources = [
{
"type": "api",
"config": {
"url": "https://dummyjson.com/products",
"headers": {"Accept": "application/json"},
},
},
{
"type": "file",
"config": {"path": "data/sample.json"},
},
{
"type": "scrape",
"config": {
"urls": ["https://www.hipflat.co.th/en"],
"schema_file": None,
"prompt": "Extract all listings",
"llm_provider": "gemini/gemini-2.0-flash",
"api_key": "AIzaSyAGnER5on8a0bVXU7quXFMnNyOvCiC_ees",
"output_format": "json",
"verbose": False,
"cache_mode": "ENABLED",
},
},
]
records = Ingestor.run(example_sources)
print(f"Total records ingested: {len(records)}")
for record in records:
print(record)

140
main.py Normal file
View File

@ -0,0 +1,140 @@
"""
FastAPI service for managing and running data integration pipelines.
"""
from typing import List, Dict, Any
from uuid import UUID
from fastapi import FastAPI, HTTPException, BackgroundTasks
import models
import stores
import services
app = FastAPI(title="Data Integration Pipeline API")
@app.post(
"/pipelines",
response_model=models.Pipeline,
status_code=201,
summary="Create a new pipeline"
)
def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline:
"""
Register a new pipeline with sources configuration.
"""
return stores.create_pipeline(pipeline_in)
@app.get(
"/pipelines",
response_model=List[models.Pipeline],
summary="List all pipelines"
)
def list_pipelines() -> List[models.Pipeline]:
"""
Retrieve all registered pipelines.
"""
return stores.list_pipelines()
@app.get(
"/pipelines/{pipeline_id}",
response_model=models.Pipeline,
summary="Get a pipeline by ID"
)
def get_pipeline(pipeline_id: UUID) -> models.Pipeline:
"""
Fetch details of a specific pipeline.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
return pipeline
@app.post(
"/pipelines/{pipeline_id}/run",
response_model=models.Run,
status_code=201,
summary="Trigger a pipeline run"
)
def run_pipeline(
pipeline_id: UUID,
background_tasks: BackgroundTasks
) -> models.Run:
"""
Start a new run for the given pipeline. Runs asynchronously.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
run = stores.create_run(pipeline_id)
background_tasks.add_task(services.execute_pipeline, pipeline, run.id)
return run
@app.get(
"/pipelines/{pipeline_id}/runs",
response_model=List[models.Run],
summary="List runs for a pipeline"
)
def list_runs(pipeline_id: UUID) -> List[models.Run]:
"""
List all runs associated with a pipeline.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
runs = stores.list_runs_for_pipeline(pipeline_id)
# Return only the Run fields (omit results/error)
return [models.Run(**r.dict()) for r in runs]
@app.get(
"/pipelines/{pipeline_id}/runs/{run_id}",
response_model=models.Run,
summary="Get run status"
)
def get_run(pipeline_id: UUID, run_id: UUID) -> models.Run:
"""
Retrieve the status of a specific run.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
run = stores.get_run(run_id)
if not run or run.pipeline_id != pipeline_id:
raise HTTPException(status_code=404, detail="Run not found")
return models.Run(**run.dict())
@app.get(
"/pipelines/{pipeline_id}/runs/{run_id}/results",
response_model=List[Dict[str, Any]],
summary="Get run results"
)
def get_run_results(pipeline_id: UUID, run_id: UUID) -> List[Dict[str, Any]]:
"""
Retrieve normalized results of a completed run.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
run = stores.get_run(run_id)
if not run or run.pipeline_id != pipeline_id:
raise HTTPException(status_code=404, detail="Run not found")
if run.status != 'COMPLETED':
raise HTTPException(
status_code=409,
detail="Run not completed or has failed"
)
return run.results or []

167
models.py Normal file
View File

@ -0,0 +1,167 @@
"""
Pydantic models for pipelines and runs.
"""
from typing import List, Union, Annotated, Optional, Literal, Dict, Any
from uuid import UUID
from datetime import datetime
from pydantic import BaseModel, Field, HttpUrl, field_validator
class RunCreate(BaseModel):
"""
Model for creating a new run. (Empty)
"""
pass
class Run(BaseModel):
"""
Status of a pipeline run.
"""
id: UUID
pipeline_id: UUID
status: Literal['PENDING', 'RUNNING', 'COMPLETED', 'FAILED']
started_at: datetime
finished_at: Optional[datetime] = None
class RunResult(Run):
"""
Extended run model including results or error.
"""
results: Optional[List[Dict[str, Any]]] = None
error: Optional[str] = None
class ApiConfig(BaseModel):
"""
Configuration for an API source.
"""
url: HttpUrl = Field(
...,
description="API endpoint URL",
example="https://api.example.com/data"
)
token: Optional[str] = Field(
None,
description="Optional bearer token for API authentication",
example="abcdef123456"
)
class ScrapeConfig(BaseModel):
"""
Configuration for a web-scraping source.
"""
urls: List[HttpUrl] = Field(
...,
description="List of URLs to scrape",
example=["https://example.com/page1", "https://example.com/page2"]
)
schema_file: Optional[str] = Field(
None,
description="Path to a JSON file containing CSS extraction schema",
example="schemas/page_schema.json"
)
prompt: Optional[str] = Field(
None,
description="Prompt string for LLM-based extraction",
example="Extract product titles and prices"
)
class FileConfig(BaseModel):
"""
Configuration for a file-based source.
"""
path: str = Field(
...,
description="Path to the input file",
example="/data/myfile.json"
)
format: Literal["csv", "json", "sqlite"] = Field(
"json",
description="Format of the file",
example="csv"
)
@field_validator("path")
def path_extension_matches_format(cls, v: str, values):
fmt = values.get("format")
if fmt and not v.lower().endswith(f".{fmt}"):
raise ValueError(f"File extension must match format '{fmt}'")
return v
class ApiSource(BaseModel):
"""
An API-based data source.
"""
type: Literal["api"] = Field(
"api", description="Discriminator for API source" # Removed const=True
)
config: ApiConfig
class ScrapeSource(BaseModel):
"""
A web-scraping data source.
"""
type: Literal["scrape"] = Field(
"scrape", description="Discriminator for scrape source" # Removed const=True
)
config: ScrapeConfig
class FileSource(BaseModel):
"""
A file-based data source.
"""
type: Literal["file"] = Field(
"file", description="Discriminator for file source" # Removed const=True
)
config: FileConfig
Source = Annotated[
Union[ApiSource, ScrapeSource, FileSource],
Field(discriminator="type", description="Union of all source types")
]
class PipelineCreate(BaseModel):
"""
Payload for creating a new pipeline.
"""
name: Optional[str] = Field(
None,
description="Optional human-readable name for the pipeline",
example="My Data Pipeline"
)
sources: List[Source] = Field(
...,
description="List of data sources for this pipeline"
)
class Pipeline(BaseModel):
"""
Representation of a pipeline.
"""
id: UUID = Field(
...,
description="Unique identifier for the pipeline"
)
name: Optional[str] = Field(
None,
description="Optional human-readable name for the pipeline"
)
sources: List[Source] = Field(
...,
description="List of configured data sources"
)
created_at: datetime = Field(
...,
description="UTC timestamp when the pipeline was created"
)

View File

@ -0,0 +1,6 @@
"""
Normalization package for data integration service.
Provides utilities and classes to normalize raw records
into a canonical schema.
"""

23
normalization/base.py Normal file
View File

@ -0,0 +1,23 @@
"""
Base module defining protocols for the normalization layer.
"""
from typing import Protocol, Dict, Any
class TextExtractor(Protocol):
"""
Protocol for text extraction strategies.
"""
def extract(self, record: Dict[str, Any]) -> str:
"""
Extract and return text from a flattened record.
Args:
record: A flattened record dict.
Returns:
A string containing the extracted text.
"""
...

View File

@ -0,0 +1,86 @@
"""
Normalizer module to transform raw records into a canonical schema.
"""
from typing import List, Dict, Any, Optional
from .base import TextExtractor
from .utils import flatten_dict, generate_id, extract_all_text
class _DefaultTextExtractor:
"""
Default text extractor using the extract_all_text utility.
"""
def extract(self, record: Dict[str, Any]) -> str:
"""
Extract text from the record.
Args:
record: A flattened record dict.
Returns:
A string containing concatenated text values.
"""
return extract_all_text(record)
class Normalizer:
"""
Class to normalize raw records into a canonical format.
"""
def __init__(self, extractor: Optional[TextExtractor] = None):
"""
Initialize the Normalizer.
Args:
extractor: Optional custom TextExtractor strategy.
"""
self.extractor: TextExtractor = extractor or _DefaultTextExtractor()
def normalize(
self,
records: List[Dict[str, Any]],
source_type: str,
source: str
) -> List[Dict[str, Any]]:
"""
Normalize a list of raw records.
Args:
records: Raw records to normalize.
source_type: Type of the source ('api', 'scrape', 'file').
source: Original source identifier (URL or path).
Returns:
A list of canonical records matching the schema.
"""
normalized: List[Dict[str, Any]] = []
for raw in records:
flat = flatten_dict(raw)
text = self.extractor.extract(flat)
rec_id = generate_id(source, flat)
metadata = {k: v for k, v in flat.items() if not isinstance(v, str)}
canonical = {
"id": rec_id,
"source_type": source_type,
"source": source,
"raw": raw,
"metadata": metadata,
"text": text,
}
normalized.append(canonical)
return normalized
if __name__ == "__main__":
# Example usage
sample = [{"title": "Hello", "details": {"body": "World", "count": 5}}]
norm = Normalizer()
records = norm.normalize(sample, source_type="api", source="https://example.com")
print(records)

64
normalization/utils.py Normal file
View File

@ -0,0 +1,64 @@
"""
Utility functions for the normalization layer.
"""
import json
import uuid
from typing import Dict, Any
def flatten_dict(
d: Dict[str, Any],
parent_key: str = "",
sep: str = "."
) -> Dict[str, Any]:
"""
Recursively flatten a nested dictionary.
Args:
d: The dictionary to flatten.
parent_key: The base key string for recursion.
sep: Separator between keys.
Returns:
A flattened dictionary with compound keys.
"""
items: Dict[str, Any] = {}
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.update(flatten_dict(v, new_key, sep=sep))
else:
items[new_key] = v
return items
def generate_id(source: str, record: Dict[str, Any]) -> str:
"""
Generate a stable UUID based on source and record content.
Args:
source: Identifier for the data source (URL or file path).
record: The flattened record dict.
Returns:
A string representation of a UUID.
"""
record_json = json.dumps(record, sort_keys=True)
namespace = uuid.NAMESPACE_URL
uid = uuid.uuid5(namespace, f"{source}-{record_json}")
return str(uid)
def extract_all_text(record: Dict[str, Any]) -> str:
"""
Extract all string values from the record and concatenate them.
Args:
record: A flattened record dict.
Returns:
A single string containing all text values separated by spaces.
"""
texts = [v for v in record.values() if isinstance(v, str)]
return " ".join(texts)

14
pyproject.toml Normal file
View File

@ -0,0 +1,14 @@
[project]
name = "crawler-ai"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"crawl4ai>=0.5.0.post8",
"fastapi[standard]>=0.115.12",
"inquirer>=3.4.0",
"pandas>=2.2.3",
"python-dotenv>=1.1.0",
"rich>=14.0.0",
]

55
services.py Normal file
View File

@ -0,0 +1,55 @@
"""
Background service to execute pipelines: ingestion normalization.
"""
from typing import List, Dict, Any
from uuid import UUID
from datetime import datetime
import stores
import models
from ingestion.ingestor import Ingestor
from normalization.normalizer import Normalizer
def execute_pipeline(pipeline: models.Pipeline, run_id: UUID) -> None:
"""
Execute a pipeline: ingest data, normalize it, and update run status.
Args:
pipeline: The Pipeline model to run.
run_id: UUID of the RunResult to update.
"""
run = stores.runs.get(run_id)
if not run:
return
# Mark as running
run.status = 'RUNNING'
run.started_at = datetime.utcnow()
try:
# Ingest raw records
raw_records: List[Dict[str, Any]] = Ingestor.run(pipeline.sources)
# Normalize records
normalizer = Normalizer()
canonical: List[Dict[str, Any]] = []
for raw in raw_records:
source_type = raw.get('source_type')
source = raw.get('source')
if not source_type or not source:
raise ValueError("Record missing 'source_type' or 'source'.")
norm = normalizer.normalize([raw], source_type, source)
canonical.extend(norm)
# Success
run.status = 'COMPLETED'
run.finished_at = datetime.utcnow()
run.results = canonical
except Exception as e:
# Failure
run.status = 'FAILED'
run.finished_at = datetime.utcnow()
run.error = str(e)

76
stores.py Normal file
View File

@ -0,0 +1,76 @@
"""
Inmemory stores for pipelines and runs.
"""
from typing import Dict, List, Optional
from uuid import UUID, uuid4
from datetime import datetime
import models
# Inmemory storage
pipelines: Dict[UUID, models.Pipeline] = {}
runs: Dict[UUID, models.RunResult] = {}
def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline:
"""
Create and store a new pipeline.
"""
pipeline_id = uuid4()
now = datetime.utcnow()
pipeline = models.Pipeline(
id=pipeline_id,
name=pipeline_in.name,
sources=pipeline_in.sources,
created_at=now,
)
pipelines[pipeline_id] = pipeline
return pipeline
def get_pipeline(pipeline_id: UUID) -> Optional[models.Pipeline]:
"""
Retrieve a pipeline by its ID.
"""
return pipelines.get(pipeline_id)
def list_pipelines() -> List[models.Pipeline]:
"""
List all registered pipelines.
"""
return list(pipelines.values())
def create_run(pipeline_id: UUID) -> models.RunResult:
"""
Create and store a new run for a given pipeline.
"""
run_id = uuid4()
now = datetime.utcnow()
run = models.RunResult(
id=run_id,
pipeline_id=pipeline_id,
status='PENDING',
started_at=now,
finished_at=None,
results=None,
error=None,
)
runs[run_id] = run
return run
def get_run(run_id: UUID) -> Optional[models.RunResult]:
"""
Retrieve a run by its ID.
"""
return runs.get(run_id)
def list_runs_for_pipeline(pipeline_id: UUID) -> List[models.RunResult]:
"""
List all runs for a specific pipeline.
"""
return [r for r in runs.values() if r.pipeline_id == pipeline_id]

1964
uv.lock Normal file

File diff suppressed because it is too large Load Diff