Merge crawler-ai repository into 'crawler_ai_project_files' subdirectory

This commit is contained in:
Sosokker 2025-05-11 17:33:36 +07:00
commit 66d1687e45
36 changed files with 3628 additions and 0 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

14
crawler_ai_project_files/.gitignore vendored Normal file
View File

@ -0,0 +1,14 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
.env
/ingestion/data

View File

@ -0,0 +1 @@
3.12

View File

View File

@ -0,0 +1,85 @@
# config.py
import os
from dotenv import load_dotenv
from pathlib import Path
# Load environment variables from .env file located in the script's directory
# Make sure .env is in the *same directory* as config.py
dotenv_path = Path(__file__).parent / '.env'
if dotenv_path.is_file():
load_dotenv(dotenv_path=dotenv_path)
else:
print(f"Warning: .env file not found at {dotenv_path}")
# --- Default Settings ---
DEFAULT_OUTPUT_FILE = "extracted_data.json"
DEFAULT_OUTPUT_FORMAT = "json" # csv, json, sqlite
DEFAULT_CACHE_MODE = "ENABLED" # ENABLED, BYPASS, DISABLED, READ_ONLY, WRITE_ONLY
DEFAULT_VERBOSE = False
DEFAULT_LLM_PROVIDER = "openai/gpt-4o-mini" # Default LLM
# --- LLM Provider Configuration ---
PROVIDER_ENV_MAP = {
"openai": "OPENAI_API_KEY",
"gemini": "GEMINI_API_KEY",
"groq": "GROQ_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"ollama": None, # Ollama typically doesn't require an API key
# Add other providers and their corresponding env variable names here
}
def get_api_key_env_name(provider: str) -> str | None:
"""Gets the expected environment variable name for the given provider."""
provider_prefix = provider.split('/')[0].lower()
return PROVIDER_ENV_MAP.get(provider_prefix)
def get_api_key(provider: str, direct_key: str | None = None, env_var_name: str | None = None) -> str | None:
"""
Retrieves the API key for a given provider.
Priority: direct_key > env_var_name > default env var from PROVIDER_ENV_MAP.
"""
if direct_key:
print(f"INFO: Using direct API key provided via --api-key for provider '{provider}'.")
return direct_key
if env_var_name:
key = os.getenv(env_var_name)
if key:
print(
f"INFO: Using API key from specified environment variable '{env_var_name}' for provider '{provider}'."
)
return key
else:
print(f"Warning: Specified environment variable '{env_var_name}' not found.")
default_env_name = get_api_key_env_name(provider)
if default_env_name:
key = os.getenv(default_env_name)
if key:
print(
f"INFO: Using API key from default environment variable '{default_env_name}' for provider '{provider}'."
)
return key
else:
if default_env_name is not None: # Don't warn if provider like Ollama has None mapping
print(
f"Warning: Default environment variable '{default_env_name}' for provider '{provider}' not found."
)
return None
# If provider is not in map and no key was provided
# Allow providers like 'ollama' to proceed without a key
if provider.split('/')[0].lower() != "ollama":
print(f"Warning: No API key found or specified for provider '{provider}'. LLM features might fail.")
return None
# --- Exportable Configuration Variables ---
LLM_PROVIDER = os.getenv("DEFAULT_LLM_PROVIDER", DEFAULT_LLM_PROVIDER)
OUTPUT_FILE = os.getenv("DEFAULT_OUTPUT_FILE", DEFAULT_OUTPUT_FILE)
OUTPUT_FORMAT = os.getenv("DEFAULT_OUTPUT_FORMAT", DEFAULT_OUTPUT_FORMAT)
CACHE_MODE = os.getenv("DEFAULT_CACHE_MODE", DEFAULT_CACHE_MODE)
VERBOSE = os.getenv("DEFAULT_VERBOSE", str(DEFAULT_VERBOSE)).lower() in ('true', '1', 't')

View File

@ -0,0 +1,3 @@
"""
Adapters package for the ingestion layer.
"""

View File

@ -0,0 +1,94 @@
"""
API adapter to fetch JSON data from HTTP endpoints.
"""
from typing import List, Dict, Any, Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .base import DataSourceAdapter
from loguru import logger
class ApiAdapter(DataSourceAdapter):
"""
Adapter for fetching data from a REST API endpoint.
"""
def __init__(
self,
url: str,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30,
token: Optional[str] = None,
):
"""
Initialize the API adapter.
Args:
url: Endpoint URL to fetch.
headers: Optional HTTP headers.
timeout: Timeout in seconds for the request.
token: Optional bearer token for Authorization header.
"""
self.url = url
self.headers = headers or {}
if token:
self.headers["Authorization"] = f"Bearer {token}"
self.timeout = timeout
logger.info(f"Initializing ApiAdapter for URL: {url}")
self.session = self._init_session()
def _init_session(self) -> requests.Session:
"""
Initialize a requests.Session with retry logic.
"""
session = requests.Session()
retries = Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("https://", adapter)
session.mount("http://", adapter)
logger.debug("HTTP session initialized with retry strategy.")
return session
def fetch(self) -> List[Dict[str, Any]]:
"""
Perform a GET request and return JSON data as a list of records.
Returns:
List of dicts from the JSON response.
Raises:
RuntimeError: On network error, HTTP error, or JSON parse error.
"""
logger.info(f"Fetching data from API: {self.url}")
try:
response = self.session.get(
self.url, headers=self.headers, timeout=self.timeout
)
response.raise_for_status()
logger.debug(f"Received response with status code: {response.status_code}")
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
raise RuntimeError(f"API request failed: {e}")
try:
data = response.json()
logger.debug(f"Successfully parsed JSON response from {self.url}")
except ValueError as e:
logger.error(f"Failed to parse JSON response: {e}")
raise RuntimeError(f"Failed to parse JSON response: {e}")
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
logger.error("Unexpected JSON structure: expected list or dict.")
raise RuntimeError("Unexpected JSON structure: expected list or dict.")

View File

@ -0,0 +1,20 @@
"""
Define the DataSourceAdapter protocol for ingestion adapters.
"""
from typing import Protocol, List, Dict, Any
class DataSourceAdapter(Protocol):
"""
Protocol for data source adapters.
"""
def fetch(self) -> List[Dict[str, Any]]:
"""
Fetch data from the source.
Returns:
A list of records, each represented as a dict.
"""
...

View File

@ -0,0 +1,112 @@
"""
File adapter to load data from CSV or JSON files.
"""
from typing import List, Dict, Any
import json
import pandas as pd
from loguru import logger
from .base import DataSourceAdapter
class FileAdapter(DataSourceAdapter):
"""
Adapter for reading data from local files (CSV or JSON), or from uploaded file-like objects.
"""
def __init__(self, path: str = None, format: str = None, upload=None, upload_filename: str = None):
"""
Initialize the file adapter.
Args:
path: Path to the input file (.csv or .json), optional if upload is provided.
format: Optional file format (e.g., 'csv', 'json').
upload: Optional file-like object (e.g., from upload).
upload_filename: Optional original filename for validation/logging.
"""
self.path = path
self.format = format
self.upload = upload
self.upload_filename = upload_filename
logger.info(f"Initialized FileAdapter for path: {path}, upload: {upload_filename}, format: {format}")
def fetch(self) -> List[Dict[str, Any]]:
"""
Read and parse the file, returning a list of records.
Supports both path-based and uploaded file-like inputs.
Returns:
List of dicts from the file contents.
Raises:
RuntimeError: On read or parse errors.
ValueError: If file extension is unsupported.
"""
if self.upload is not None:
# Handle uploaded file-like object
logger.info(f"Fetching data from uploaded file: {self.upload_filename or '[no filename]'}")
if self.format == "csv" or (self.upload_filename and self.upload_filename.lower().endswith(".csv")):
try:
self.upload.seek(0)
df = pd.read_csv(self.upload)
logger.debug(f"Successfully read uploaded CSV file: {self.upload_filename}")
return df.to_dict(orient="records")
except Exception as e:
logger.error(f"Failed to read uploaded CSV '{self.upload_filename}': {e}")
raise RuntimeError(f"Failed to read uploaded CSV '{self.upload_filename}': {e}")
elif self.format == "json" or (self.upload_filename and self.upload_filename.lower().endswith(".json")):
try:
self.upload.seek(0)
data = json.load(self.upload)
logger.debug(f"Successfully read uploaded JSON file: {self.upload_filename}")
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
logger.error(f"Uploaded JSON file '{self.upload_filename}' does not contain a list or dict.")
raise RuntimeError(
f"Uploaded JSON file '{self.upload_filename}' does not contain a list or dict."
)
except Exception as e:
logger.error(f"Failed to read uploaded JSON '{self.upload_filename}': {e}")
raise RuntimeError(f"Failed to read uploaded JSON '{self.upload_filename}': {e}")
else:
logger.error(f"Unsupported uploaded file extension for '{self.upload_filename}'. Only .csv and .json are supported.")
raise ValueError(
f"Unsupported uploaded file extension for '{self.upload_filename}'. "
"Only .csv and .json are supported."
)
# Fallback to path-based loading
p = (self.path or "").lower()
logger.info(f"Attempting to fetch data from file: {self.path}")
if p.endswith(".csv"):
try:
df = pd.read_csv(self.path)
logger.debug(f"Successfully read CSV file: {self.path}")
return df.to_dict(orient="records")
except Exception as e:
logger.error(f"Failed to read CSV '{self.path}': {e}")
raise RuntimeError(f"Failed to read CSV '{self.path}': {e}")
if p.endswith(".json"):
try:
with open(self.path, "r", encoding="utf-8") as f:
data = json.load(f)
logger.debug(f"Successfully read JSON file: {self.path}")
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
logger.error(f"JSON file '{self.path}' does not contain a list or dict.")
raise RuntimeError(
f"JSON file '{self.path}' does not contain a list or dict."
)
except Exception as e:
logger.error(f"Failed to read JSON '{self.path}': {e}")
raise RuntimeError(f"Failed to read JSON '{self.path}': {e}")
logger.error(f"Unsupported file extension for '{self.path}'. Only .csv and .json are supported.")
raise ValueError(
f"Unsupported file extension for '{self.path}'. "
"Only .csv and .json are supported."
)

View File

@ -0,0 +1,175 @@
"""
Web scraper adapter using crawl4ai to extract structured data.
"""
import asyncio
import json
from typing import List, Dict, Any, Optional
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
CacheMode,
LLMConfig,
CrawlResult,
)
from crawl4ai.extraction_strategy import (
JsonCssExtractionStrategy,
LLMExtractionStrategy,
ExtractionStrategy,
)
from .base import DataSourceAdapter
from loguru import logger
class WebScraperAdapter(DataSourceAdapter):
"""
Adapter for web scraping using crawl4ai.
"""
def __init__(
self,
urls: List[str],
schema_file: Optional[str] = None,
prompt: Optional[str] = None,
llm_provider: str = "openai/gpt-4",
api_key: Optional[str] = None,
output_format: str = "json",
verbose: bool = False,
cache_mode: str = "ENABLED",
):
"""
Initialize the scraper adapter.
Args:
urls: List of URLs to scrape.
schema_file: Path to a JSON file with CSS extraction schema.
prompt: Prompt for LLM-based extraction.
llm_provider: LLM provider identifier.
api_key: API key for the LLM provider.
output_format: Desired format for the extracted data.
verbose: Enable verbose logging.
cache_mode: Crawl cache mode (e.g., 'ENABLED').
"""
self.urls = urls
self.schema_file = schema_file
self.prompt = prompt
self.llm_provider = llm_provider
self.api_key = api_key
self.output_format = output_format
self.verbose = verbose
self.cache_mode = cache_mode
logger.info(f"Initialized WebScraperAdapter for URLs: {urls}")
def fetch(self) -> List[Dict[str, Any]]:
"""
Synchronously fetch data by running the async crawler.
Returns:
List of extracted records.
Raises:
RuntimeError: On failure during crawling or extraction.
"""
logger.info("Starting synchronous fetch for web scraping.")
try:
return asyncio.run(self._fetch_async())
except Exception as e:
logger.error(f"Web scraping failed: {e}")
raise RuntimeError(f"Web scraping failed: {e}")
async def _fetch_async(self) -> List[Dict[str, Any]]:
"""
Internal async method to perform crawling and extraction.
"""
logger.info("Starting async web scraping fetch.")
# Initialize crawler
browser_cfg = BrowserConfig(headless=True, verbose=self.verbose)
crawler = AsyncWebCrawler(config=browser_cfg)
await crawler.start()
# Prepare extraction strategy
llm_cfg = LLMConfig(provider=self.llm_provider, api_token=self.api_key)
extraction_strategy: Optional[ExtractionStrategy] = None
if self.schema_file:
try:
with open(self.schema_file, "r", encoding="utf-8") as f:
schema = json.load(f)
extraction_strategy = JsonCssExtractionStrategy(
schema=schema, verbose=self.verbose
)
logger.debug(f"Loaded schema file: {self.schema_file}")
except Exception as e:
logger.error(f"Failed to load schema file '{self.schema_file}': {e}")
await crawler.close()
raise RuntimeError(
f"Failed to load schema file '{self.schema_file}': {e}"
)
elif self.prompt:
extraction_strategy = LLMExtractionStrategy(
llm_config=llm_cfg,
instruction=self.prompt,
extraction_type="schema",
apply_chunking=True,
verbose=self.verbose,
)
logger.debug("Using LLM extraction strategy.")
else:
logger.error("Either 'schema_file' or 'prompt' must be provided.")
await crawler.close()
raise ValueError("Either 'schema_file' or 'prompt' must be provided.")
# Configure cache mode
try:
cache_enum = getattr(CacheMode, self.cache_mode.upper())
except AttributeError:
logger.warning(f"Invalid cache mode '{self.cache_mode}', defaulting to ENABLED.")
cache_enum = CacheMode.ENABLED
run_cfg = CrawlerRunConfig(
cache_mode=cache_enum,
extraction_strategy=extraction_strategy,
verbose=self.verbose,
)
# Execute crawl
try:
logger.info(f"Crawling URLs: {self.urls}")
results: List[CrawlResult] = await crawler.arun_many(
urls=self.urls, config=run_cfg
)
logger.debug(f"Crawling completed. Results: {results}")
finally:
await crawler.close()
# Process crawl results
records: List[Dict[str, Any]] = []
for res in results:
if not res.success or not res.extracted_content:
logger.warning(f"Skipping failed or empty result for URL: {getattr(res, 'url', None)}")
continue
try:
content = json.loads(res.extracted_content)
logger.debug(f"Parsed extracted content for URL: {res.url}")
except Exception:
logger.error(f"Failed to parse extracted content for URL: {res.url}")
continue
if content is None:
logger.warning(f"Extracted content is None for URL: {res.url}")
continue
if isinstance(content, list):
for item in content:
if isinstance(item, dict):
item["source_url"] = res.url
records.extend(content)
elif isinstance(content, dict):
content["source_url"] = res.url
records.append(content)
else:
logger.warning(f"Extracted content for URL {res.url} is not a list or dict: {type(content)}")
logger.info(f"Web scraping completed. Extracted {len(records)} records.")
return records

View File

@ -0,0 +1,112 @@
"""
Ingestor module to orchestrate data ingestion from multiple adapters.
"""
from typing import List, Dict, Any
from ingestion.adapters.api_adapter import ApiAdapter
from ingestion.adapters.file_adapter import FileAdapter
from ingestion.adapters.web_scraper_adapter import WebScraperAdapter
from pydantic import BaseModel
from loguru import logger
class Ingestor:
"""
Ingestor for aggregating data from various sources.
"""
@staticmethod
def run(sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Run ingestion for a list of sources.
Args:
sources: List of dicts, each with:
- type: 'api', 'scrape', or 'file'
- config: kwargs for the adapter constructor
Returns:
Flat list of all records fetched from sources.
Raises:
ValueError: For unknown source types.
RuntimeError: If an adapter fails during fetch.
"""
from log.logging_utils import pipeline_log
aggregated: List[Dict[str, Any]] = []
logger.info("Starting ingestion run for sources.")
for src in sources:
# accept Pydantic models or raw dicts
if isinstance(src, BaseModel):
src_item = src.dict()
else:
src_item = src
src_type = src_item.get("type")
config = src_item.get("config", {})
# convert BaseModel config to dict if needed
if not isinstance(config, dict) and hasattr(config, "dict"):
config = config.dict(exclude_unset=True)
pipeline_id = config.get("pipeline_id") or src_item.get("pipeline_id")
run_id = config.get("run_id") or src_item.get("run_id")
logger.info(f"Processing source type: {src_type} with config: {config}")
if src_type == "api":
adapter = ApiAdapter(**config)
elif src_type == "scrape":
adapter = WebScraperAdapter(**config)
elif src_type == "file":
adapter = FileAdapter(**config)
else:
logger.error(f"Unknown source type: {src_type}")
pipeline_log("ERROR", f"Unknown source type: {src_type}", pipeline_id, run_id, status="FAILED")
raise ValueError(f"Unknown source type: {src_type}")
try:
logger.info(f"Fetching records using {src_type} adapter.")
records = adapter.fetch()
logger.info(f"Fetched {len(records)} records from {src_type} source.")
aggregated.extend(records)
pipeline_log("SUCCESS", f"Fetched {len(records)} records.", pipeline_id, run_id, status="COMPLETED")
except Exception as e:
logger.error(f"Fetch failed for source {src_type}: {e}")
pipeline_log("ERROR", f"Fetch failed: {e}", pipeline_id, run_id, status="FAILED")
raise RuntimeError(f"Fetch failed for source {src_type}: {e}")
logger.info(f"Ingestion run completed. Total records aggregated: {len(aggregated)}")
return aggregated
if __name__ == "__main__":
# Example usage of the Ingestor.
example_sources = [
{
"type": "api",
"config": {
"url": "https://dummyjson.com/products",
"headers": {"Accept": "application/json"},
},
},
{
"type": "file",
"config": {"path": "data/sample.json"},
},
{
"type": "scrape",
"config": {
"urls": ["https://www.hipflat.co.th/en"],
"schema_file": None,
"prompt": "Extract all listings",
"llm_provider": "gemini/gemini-2.0-flash",
"api_key": "AIzaSyAGnER5on8a0bVXU7quXFMnNyOvCiC_ees",
"output_format": "json",
"verbose": False,
"cache_mode": "ENABLED",
},
},
]
records = Ingestor.run(example_sources)
print(f"Total records ingested: {len(records)}")
for record in records:
print(record)

View File

View File

@ -0,0 +1,26 @@
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import StreamingResponse
from log.logging_utils import RUN_LOG_QUEUES
from queue import Empty
import asyncio
router = APIRouter()
@router.get("/pipelines/{pipeline_id}/runs/{run_id}/logs/stream")
async def stream_logs(request: Request, pipeline_id: str, run_id: str):
log_queue = RUN_LOG_QUEUES.get(run_id)
if not log_queue:
raise HTTPException(status_code=404, detail="No logs for this run.")
async def event_generator():
while True:
if await request.is_disconnected():
break
try:
log_line = log_queue.get(timeout=1)
yield f"data: {log_line}\n\n"
except Empty:
await asyncio.sleep(0.2)
continue
return StreamingResponse(event_generator(), media_type="text/event-stream")

View File

@ -0,0 +1,69 @@
from loguru import logger
from queue import Queue
from typing import Dict, Optional
import json
# Per-run log queues (thread-safe)
RUN_LOG_QUEUES: Dict[str, Queue] = {}
RUN_LOG_HANDLERS: Dict[str, int] = {}
# Structured log format
def make_log_record(level: str, message: str, pipeline_id: Optional[str], run_id: Optional[str], status: Optional[str] = None, error: Optional[str] = None, extra: Optional[dict] = None) -> dict:
record = {
"level": level,
"message": message,
"pipeline_id": pipeline_id,
"run_id": run_id,
"status": status,
"error": error,
"extra": extra or {},
}
return record
# Custom loguru sink for per-run logging
def log_sink(message):
record = message.record
run_id = record["extra"].get("run_id")
pipeline_id = record["extra"].get("pipeline_id")
if run_id and run_id in RUN_LOG_QUEUES:
# Structure the log as JSON for frontend parsing
log_entry = make_log_record(
level=record["level"].name,
message=record["message"],
pipeline_id=pipeline_id,
run_id=run_id,
status=record["extra"].get("status"),
error=record["extra"].get("error"),
extra=record["extra"]
)
RUN_LOG_QUEUES[run_id].put(json.dumps(log_entry))
# Setup per-run logging sink
def setup_run_logging(pipeline_id: str, run_id: str):
log_queue = Queue()
RUN_LOG_QUEUES[run_id] = log_queue
handler_id = logger.add(
log_sink,
filter=lambda record: record["extra"].get("run_id") == run_id,
enqueue=True
)
RUN_LOG_HANDLERS[run_id] = handler_id
return log_queue
# Remove per-run logging sink and clean up
def cleanup_run_logging(run_id: str):
if run_id in RUN_LOG_HANDLERS:
logger.remove(RUN_LOG_HANDLERS[run_id])
del RUN_LOG_HANDLERS[run_id]
if run_id in RUN_LOG_QUEUES:
del RUN_LOG_QUEUES[run_id]
# Helper for logging with context
def pipeline_log(level: str, message: str, pipeline_id: str, run_id: str, status: Optional[str] = None, error: Optional[str] = None, extra: Optional[dict] = None):
logger.log(level, message, extra={"pipeline_id": pipeline_id, "run_id": run_id, "status": status, "error": error, **(extra or {})})
# Example usage:
# setup_run_logging(pipeline_id, run_id)
# pipeline_log("INFO", "Pipeline started", pipeline_id, run_id, status="RUNNING")
# pipeline_log("ERROR", "Pipeline failed", pipeline_id, run_id, status="FAILED", error="Some error")
# cleanup_run_logging(run_id)

View File

@ -0,0 +1,167 @@
"""
FastAPI service for managing and running data integration pipelines.
"""
from typing import List, Dict, Any
from uuid import UUID
from fastapi import FastAPI, HTTPException, BackgroundTasks
import platform
import asyncio
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
import models
import stores
import services
from log.log_stream import router as log_stream_router
app = FastAPI(title="Data Integration Pipeline API")
app.include_router(log_stream_router)
@app.post(
"/pipelines",
response_model=models.Pipeline,
status_code=201,
summary="Create a new pipeline"
)
def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline:
"""
Register a new pipeline with sources configuration.
"""
return stores.create_pipeline(pipeline_in)
@app.get(
"/pipelines",
response_model=List[models.Pipeline],
summary="List all pipelines"
)
def list_pipelines() -> List[models.Pipeline]:
"""
Retrieve all registered pipelines.
"""
return stores.list_pipelines()
@app.get(
"/pipelines/{pipeline_id}",
response_model=models.Pipeline,
summary="Get a pipeline by ID"
)
def get_pipeline(pipeline_id: UUID) -> models.Pipeline:
"""
Fetch details of a specific pipeline.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
return pipeline
@app.post(
"/pipelines/{pipeline_id}/run",
response_model=models.Run,
status_code=201,
summary="Trigger a pipeline run"
)
def run_pipeline(
pipeline_id: UUID,
background_tasks: BackgroundTasks
) -> models.Run:
"""
Start a new run for the given pipeline. Runs asynchronously.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
run = stores.create_run(pipeline_id)
background_tasks.add_task(services.execute_pipeline, pipeline, run.id)
return run
@app.get(
"/pipelines/{pipeline_id}/runs",
response_model=List[models.Run],
summary="List runs for a pipeline"
)
def list_runs(pipeline_id: UUID) -> List[models.Run]:
"""
List all runs associated with a pipeline.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
runs = stores.list_runs_for_pipeline(pipeline_id)
# Return only the Run fields (omit results/error)
return [models.Run(**r.dict()) for r in runs]
@app.get(
"/pipelines/{pipeline_id}/runs/{run_id}",
response_model=models.Run,
summary="Get run status"
)
def get_run(pipeline_id: UUID, run_id: UUID) -> models.Run:
"""
Retrieve the status of a specific run.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
run = stores.get_run(run_id)
if not run or run.pipeline_id != pipeline_id:
raise HTTPException(status_code=404, detail="Run not found")
return models.Run(**run.dict())
@app.get(
"/pipelines/{pipeline_id}/runs/{run_id}/results",
response_model=List[Dict[str, Any]],
summary="Get run results"
)
def get_run_results(pipeline_id: UUID, run_id: UUID) -> List[Dict[str, Any]]:
"""
Retrieve normalized results of a completed run.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
run = stores.get_run(run_id)
if not run or run.pipeline_id != pipeline_id:
raise HTTPException(status_code=404, detail="Run not found")
if run.status != 'COMPLETED':
raise HTTPException(
status_code=409,
detail="Run not completed or has failed"
)
return run.results or []
# Dedicated endpoint to retrieve the error message for a failed run
@app.get(
"/pipelines/{pipeline_id}/runs/{run_id}/error",
response_model=str,
summary="Get run error message"
)
def get_run_error(pipeline_id: UUID, run_id: UUID) -> str:
"""
Retrieve the error message for a run that failed.
"""
pipeline = stores.get_pipeline(pipeline_id)
if not pipeline:
raise HTTPException(status_code=404, detail="Pipeline not found")
run = stores.get_run(run_id)
if not run or run.pipeline_id != pipeline_id:
raise HTTPException(status_code=404, detail="Run not found")
return run.error or ""

View File

@ -0,0 +1,195 @@
"""
Pydantic models for pipelines and runs.
"""
from typing import List, Union, Annotated, Optional, Literal, Dict, Any
from uuid import UUID
from datetime import datetime
from pydantic import BaseModel, Field, HttpUrl, field_validator, ValidationInfo
class RunCreate(BaseModel):
"""
Model for creating a new run. (Empty)
"""
pass
class Run(BaseModel):
"""
Status of a pipeline run.
"""
id: UUID
pipeline_id: UUID
status: Literal['PENDING', 'RUNNING', 'COMPLETED', 'FAILED']
started_at: datetime
finished_at: Optional[datetime] = None
class RunResult(Run):
"""
Extended run model including results or error.
"""
results: Optional[List[Dict[str, Any]]] = None
error: Optional[str] = None
class ApiConfig(BaseModel):
"""
Configuration for an API source.
"""
url: HttpUrl = Field(
...,
description="API endpoint URL",
example="https://api.example.com/data"
)
token: Optional[str] = Field(
None,
description="Optional bearer token for API authentication",
example="abcdef123456"
)
class ScrapeConfig(BaseModel):
"""
Configuration for a web-scraping source.
"""
urls: List[HttpUrl] = Field(
...,
description="List of URLs to scrape",
example=["https://example.com/page1", "https://example.com/page2"]
)
schema_file: Optional[str] = Field(
None,
description="Path to a JSON file containing CSS extraction schema",
example="schemas/page_schema.json"
)
prompt: Optional[str] = Field(
None,
description="Prompt string for LLM-based extraction",
example="Extract product titles and prices"
)
class FileConfig(BaseModel):
"""
Configuration for a file-based source. Supports either a file path or an uploaded file.
"""
path: Optional[str] = Field(
None,
description="Path to the input file (optional if upload is provided)",
example="/data/myfile.json"
)
upload: Optional[Any] = Field(
None,
description="Uploaded file object or metadata (optional if path is provided)",
example=None
)
upload_filename: Optional[str] = Field(
None,
description="Original filename of the uploaded file (for validation)",
example="myfile.json"
)
format: Literal["csv", "json", "sqlite"] = Field(
"json",
description="Format of the file",
example="csv"
)
@field_validator("path", mode="before")
def require_path_or_upload(cls, v, info: ValidationInfo):
data = info.data
if not v and not data.get("upload"):
raise ValueError("Either 'path' or 'upload' must be provided.")
return v
@field_validator("upload_filename", mode="before")
def filename_extension_matches_format(cls, v, info: ValidationInfo):
fmt = info.data.get("format")
if v and fmt and not v.lower().endswith(f".{fmt}"):
raise ValueError(f"Uploaded file extension must match format '{fmt}'")
return v
@field_validator("path", mode="after")
def path_or_upload_extension_matches_format(cls, v, info: ValidationInfo):
fmt = info.data.get("format")
upload_filename = info.data.get("upload_filename")
if v and fmt and not v.lower().endswith(f".{fmt}"):
raise ValueError(f"File extension must match format '{fmt}'")
if upload_filename and fmt and not upload_filename.lower().endswith(f".{fmt}"):
raise ValueError(f"Uploaded file extension must match format '{fmt}'")
return v
class ApiSource(BaseModel):
"""
An API-based data source.
"""
type: Literal["api"] = Field(
"api", description="Discriminator for API source" # Removed const=True
)
config: ApiConfig
class ScrapeSource(BaseModel):
"""
A web-scraping data source.
"""
type: Literal["scrape"] = Field(
"scrape", description="Discriminator for scrape source" # Removed const=True
)
config: ScrapeConfig
class FileSource(BaseModel):
"""
A file-based data source.
"""
type: Literal["file"] = Field(
"file", description="Discriminator for file source" # Removed const=True
)
config: FileConfig
Source = Annotated[
Union[ApiSource, ScrapeSource, FileSource],
Field(discriminator="type", description="Union of all source types")
]
class PipelineCreate(BaseModel):
"""
Payload for creating a new pipeline.
"""
name: Optional[str] = Field(
None,
description="Optional human-readable name for the pipeline",
example="My Data Pipeline"
)
sources: List[Source] = Field(
...,
description="List of data sources for this pipeline"
)
class Pipeline(BaseModel):
"""
Representation of a pipeline.
"""
id: UUID = Field(
...,
description="Unique identifier for the pipeline"
)
name: Optional[str] = Field(
None,
description="Optional human-readable name for the pipeline"
)
sources: List[Source] = Field(
...,
description="List of configured data sources"
)
created_at: datetime = Field(
...,
description="UTC timestamp when the pipeline was created"
)

View File

@ -0,0 +1,6 @@
"""
Normalization package for data integration service.
Provides utilities and classes to normalize raw records
into a canonical schema.
"""

View File

@ -0,0 +1,23 @@
"""
Base module defining protocols for the normalization layer.
"""
from typing import Protocol, Dict, Any
class TextExtractor(Protocol):
"""
Protocol for text extraction strategies.
"""
def extract(self, record: Dict[str, Any]) -> str:
"""
Extract and return text from a flattened record.
Args:
record: A flattened record dict.
Returns:
A string containing the extracted text.
"""
...

View File

@ -0,0 +1,86 @@
"""
Normalizer module to transform raw records into a canonical schema.
"""
from typing import List, Dict, Any, Optional
from .base import TextExtractor
from .utils import flatten_dict, generate_id, extract_all_text
class _DefaultTextExtractor:
"""
Default text extractor using the extract_all_text utility.
"""
def extract(self, record: Dict[str, Any]) -> str:
"""
Extract text from the record.
Args:
record: A flattened record dict.
Returns:
A string containing concatenated text values.
"""
return extract_all_text(record)
class Normalizer:
"""
Class to normalize raw records into a canonical format.
"""
def __init__(self, extractor: Optional[TextExtractor] = None):
"""
Initialize the Normalizer.
Args:
extractor: Optional custom TextExtractor strategy.
"""
self.extractor: TextExtractor = extractor or _DefaultTextExtractor()
def normalize(
self,
records: List[Dict[str, Any]],
source_type: str,
source: str
) -> List[Dict[str, Any]]:
"""
Normalize a list of raw records.
Args:
records: Raw records to normalize.
source_type: Type of the source ('api', 'scrape', 'file').
source: Original source identifier (URL or path).
Returns:
A list of canonical records matching the schema.
"""
normalized: List[Dict[str, Any]] = []
for raw in records:
flat = flatten_dict(raw)
text = self.extractor.extract(flat)
rec_id = generate_id(source, flat)
metadata = {k: v for k, v in flat.items() if not isinstance(v, str)}
canonical = {
"id": rec_id,
"source_type": source_type,
"source": source,
"raw": raw,
"metadata": metadata,
"text": text,
}
normalized.append(canonical)
return normalized
if __name__ == "__main__":
# Example usage
sample = [{"title": "Hello", "details": {"body": "World", "count": 5}}]
norm = Normalizer()
records = norm.normalize(sample, source_type="api", source="https://example.com")
print(records)

View File

@ -0,0 +1,64 @@
"""
Utility functions for the normalization layer.
"""
import json
import uuid
from typing import Dict, Any
def flatten_dict(
d: Dict[str, Any],
parent_key: str = "",
sep: str = "."
) -> Dict[str, Any]:
"""
Recursively flatten a nested dictionary.
Args:
d: The dictionary to flatten.
parent_key: The base key string for recursion.
sep: Separator between keys.
Returns:
A flattened dictionary with compound keys.
"""
items: Dict[str, Any] = {}
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.update(flatten_dict(v, new_key, sep=sep))
else:
items[new_key] = v
return items
def generate_id(source: str, record: Dict[str, Any]) -> str:
"""
Generate a stable UUID based on source and record content.
Args:
source: Identifier for the data source (URL or file path).
record: The flattened record dict.
Returns:
A string representation of a UUID.
"""
record_json = json.dumps(record, sort_keys=True)
namespace = uuid.NAMESPACE_URL
uid = uuid.uuid5(namespace, f"{source}-{record_json}")
return str(uid)
def extract_all_text(record: Dict[str, Any]) -> str:
"""
Extract all string values from the record and concatenate them.
Args:
record: A flattened record dict.
Returns:
A single string containing all text values separated by spaces.
"""
texts = [v for v in record.values() if isinstance(v, str)]
return " ".join(texts)

View File

@ -0,0 +1,17 @@
[project]
name = "crawler-ai"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"crawl4ai>=0.5.0.post8",
"fastapi[standard]>=0.115.12",
"inquirer>=3.4.0",
"loguru>=0.7.3",
"pandas>=2.2.3",
"pytest>=8.3.5",
"python-dotenv>=1.1.0",
"responses>=0.25.7",
"rich>=14.0.0",
]

View File

@ -0,0 +1,38 @@
{
"quiz": {
"sport": {
"q1": {
"question": "Which one is correct team name in NBA?",
"options": [
"New York Bulls",
"Los Angeles Kings",
"Golden State Warriros",
"Huston Rocket"
],
"answer": "Huston Rocket"
}
},
"maths": {
"q1": {
"question": "5 + 7 = ?",
"options": [
"10",
"11",
"12",
"13"
],
"answer": "12"
},
"q2": {
"question": "12 - 8 = ?",
"options": [
"1",
"2",
"3",
"4"
],
"answer": "4"
}
}
}
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,68 @@
"""
Background service to execute pipelines: ingestion normalization.
"""
from typing import List, Dict, Any
from uuid import UUID
from datetime import datetime
import stores
import models
from ingestion.ingestor import Ingestor
from normalization.normalizer import Normalizer
from log.logging_utils import setup_run_logging, cleanup_run_logging, pipeline_log
def execute_pipeline(pipeline: models.Pipeline, run_id: UUID) -> None:
"""
Execute a pipeline: ingest data, normalize it, and update run status.
Args:
pipeline: The Pipeline model to run.
run_id: UUID of the RunResult to update.
"""
run = stores.runs.get(run_id)
if not run:
return
# Setup structured per-run logging
setup_run_logging(str(pipeline.id), str(run_id))
pipeline_log("INFO", "Pipeline run starting", str(pipeline.id), str(run_id), status="RUNNING")
# Mark as running
run.status = 'RUNNING'
run.started_at = datetime.utcnow()
try:
# Ingest raw records
pipeline_log("INFO", "Ingesting raw records", str(pipeline.id), str(run_id))
raw_records: List[Dict[str, Any]] = Ingestor.run(pipeline.sources)
pipeline_log("INFO", f"Ingested {len(raw_records)} records", str(pipeline.id), str(run_id))
# Normalize records
normalizer = Normalizer()
canonical: List[Dict[str, Any]] = []
for raw in raw_records:
source_type = raw.get('source_type')
source = raw.get('source')
if not source_type or not source:
pipeline_log("ERROR", "Record missing 'source_type' or 'source'", str(pipeline.id), str(run_id), status="FAILED")
raise ValueError("Record missing 'source_type' or 'source'.")
norm = normalizer.normalize([raw], source_type, source)
canonical.extend(norm)
# Success
run.status = 'COMPLETED'
run.finished_at = datetime.utcnow()
run.results = canonical
pipeline_log("SUCCESS", f"Pipeline run completed with {len(canonical)} records", str(pipeline.id), str(run_id), status="COMPLETED")
except Exception as e:
# Log failure with stack trace
pipeline_log("ERROR", f"Pipeline run failed: {e}", str(pipeline.id), str(run_id), status="FAILED", error=str(e))
run.status = 'FAILED'
run.finished_at = datetime.utcnow()
run.error = str(e)
finally:
pipeline_log("INFO", "Pipeline run finished", str(pipeline.id), str(run_id), status=run.status)
cleanup_run_logging(str(run_id))

View File

@ -0,0 +1,76 @@
"""
Inmemory stores for pipelines and runs.
"""
from typing import Dict, List, Optional
from uuid import UUID, uuid4
from datetime import datetime
import models
# Inmemory storage
pipelines: Dict[UUID, models.Pipeline] = {}
runs: Dict[UUID, models.RunResult] = {}
def create_pipeline(pipeline_in: models.PipelineCreate) -> models.Pipeline:
"""
Create and store a new pipeline.
"""
pipeline_id = uuid4()
now = datetime.utcnow()
pipeline = models.Pipeline(
id=pipeline_id,
name=pipeline_in.name,
sources=pipeline_in.sources,
created_at=now,
)
pipelines[pipeline_id] = pipeline
return pipeline
def get_pipeline(pipeline_id: UUID) -> Optional[models.Pipeline]:
"""
Retrieve a pipeline by its ID.
"""
return pipelines.get(pipeline_id)
def list_pipelines() -> List[models.Pipeline]:
"""
List all registered pipelines.
"""
return list(pipelines.values())
def create_run(pipeline_id: UUID) -> models.RunResult:
"""
Create and store a new run for a given pipeline.
"""
run_id = uuid4()
now = datetime.utcnow()
run = models.RunResult(
id=run_id,
pipeline_id=pipeline_id,
status='PENDING',
started_at=now,
finished_at=None,
results=None,
error=None,
)
runs[run_id] = run
return run
def get_run(run_id: UUID) -> Optional[models.RunResult]:
"""
Retrieve a run by its ID.
"""
return runs.get(run_id)
def list_runs_for_pipeline(pipeline_id: UUID) -> List[models.RunResult]:
"""
List all runs for a specific pipeline.
"""
return [r for r in runs.values() if r.pipeline_id == pipeline_id]

View File

@ -0,0 +1,28 @@
{
"name": "Demo Data Pipeline",
"sources": [
{
"type": "api",
"config": {
"url": "https://dummyjson.com/posts"
}
},
{
"type": "scrape",
"config": {
"urls": [
"https://www.thairath.co.th/home",
"https://www.thaipbs.or.th/news"
],
"prompt": "Extract all post in the pages along with its brief details and url"
}
},
{
"type": "file",
"config": {
"path": "/data/sample.json",
"format": "json"
}
}
]
}

View File

@ -0,0 +1,109 @@
import responses
import pytest
from ingestion.adapters.api_adapter import ApiAdapter
import httpx
@pytest.fixture
def single_product():
return "https://dummyjson.com/products/1"
@pytest.fixture
def multiple_product():
return "https://dummyjson.com/products"
@pytest.fixture
def auth_endpoint():
return "https://dummyjson.com/auth/login"
@pytest.fixture
def auth_require_endpoint():
return "https://dummyjson.com/auth/me"
def test_fetch_dict_response(single_product):
"""Test fetching a single record from a JSON API endpoint."""
response = httpx.get(single_product, timeout=10)
response.raise_for_status()
expected_data = response.json()
assert isinstance(expected_data, dict)
adapter = ApiAdapter(url=single_product)
adapter_result = adapter.fetch()
assert isinstance(adapter_result, list)
assert adapter_result[0] == expected_data
def test_fetch_list_response(multiple_product):
"""Test fetching a list of records from a JSON API endpoint."""
response = httpx.get(multiple_product, timeout=10)
response.raise_for_status()
expected_data = response.json()
adapter = ApiAdapter(url=multiple_product)
adapter_result = adapter.fetch()
assert adapter_result[0] == expected_data
@responses.activate
def test_fetch_http_error(single_product):
"""Test handling HTTP errors and validate graceful failure."""
for _ in range(4):
responses.add(responses.GET, single_product, status=500)
adapter = ApiAdapter(url=single_product)
with pytest.raises(RuntimeError) as exc_info:
adapter.fetch()
assert "API request failed" in str(exc_info.value)
@responses.activate
def test_fetch_json_decode_error(single_product):
"""Test handling JSON decode errors."""
responses.add(responses.GET, single_product, body="not-a-json", status=200)
adapter = ApiAdapter(url=single_product)
with pytest.raises(RuntimeError) as exc_info:
adapter.fetch()
assert "Failed to parse JSON response" in str(exc_info.value)
def test_token_header_injection(auth_endpoint, auth_require_endpoint):
"""Test that the token is injected into the Authorization header."""
payload = {
"username": "emilys",
"password": "emilyspass",
"expiresInMins": 30
}
response = httpx.post(
auth_endpoint,
timeout=10,
headers={"Content-Type": "application/json"},
json=payload
)
response.raise_for_status()
assert response.status_code == 200
token = response.json().get("accessToken")
adapter = ApiAdapter(url=auth_require_endpoint, token=token)
adapter_result = adapter.fetch()
assert isinstance(adapter_result, list)
assert adapter_result[0].get("username") == "emilys"
def test_custom_headers_are_used(single_product):
"""Test that custom headers are used."""
headers = {"X-Custom-Header": "test-value"}
adapter = ApiAdapter(url=single_product, headers=headers)
assert adapter.headers.get("X-Custom-Header") == "test-value"

File diff suppressed because it is too large Load Diff