diff --git a/pipeline/ingestion/adapters/file_adapter.py b/pipeline/ingestion/adapters/file_adapter.py index 04ee228..8178ae9 100644 --- a/pipeline/ingestion/adapters/file_adapter.py +++ b/pipeline/ingestion/adapters/file_adapter.py @@ -2,13 +2,12 @@ File adapter to load data from CSV or JSON files. """ -from typing import List, Dict, Any -import json - import pandas as pd from loguru import logger +from fastapi import UploadFile from .base import DataSourceAdapter +from models.adapters import AdapterRecord class FileAdapter(DataSourceAdapter): @@ -16,97 +15,46 @@ class FileAdapter(DataSourceAdapter): Adapter for reading data from local files (CSV or JSON), or from uploaded file-like objects. """ - def __init__(self, path: str = None, format: str = None, upload=None, upload_filename: str = None): + def __init__( + self, + upload: UploadFile, + ): """ Initialize the file adapter. Args: - path: Path to the input file (.csv or .json), optional if upload is provided. - format: Optional file format (e.g., 'csv', 'json'). - upload: Optional file-like object (e.g., from upload). - upload_filename: Optional original filename for validation/logging. + upload: File uploaded from user. """ - self.path = path - self.format = format self.upload = upload - self.upload_filename = upload_filename - logger.info(f"Initialized FileAdapter for path: {path}, upload: {upload_filename}, format: {format}") + logger.info( + f"Initialized FileAdapter for upload: {upload.filename}, format: {upload.content_type}" + ) - def fetch(self) -> List[Dict[str, Any]]: + def fetch(self) -> list[AdapterRecord]: """ Read and parse the file, returning a list of records. Supports both path-based and uploaded file-like inputs. Returns: - List of dicts from the file contents. - - Raises: - RuntimeError: On read or parse errors. - ValueError: If file extension is unsupported. + List of AdapterRecord objects. """ - if self.upload is not None: - # Handle uploaded file-like object - logger.info(f"Fetching data from uploaded file: {self.upload_filename or '[no filename]'}") - if self.format == "csv" or (self.upload_filename and self.upload_filename.lower().endswith(".csv")): - try: - self.upload.seek(0) - df = pd.read_csv(self.upload) - logger.debug(f"Successfully read uploaded CSV file: {self.upload_filename}") - return df.to_dict(orient="records") - except Exception as e: - logger.error(f"Failed to read uploaded CSV '{self.upload_filename}': {e}") - raise RuntimeError(f"Failed to read uploaded CSV '{self.upload_filename}': {e}") - elif self.format == "json" or (self.upload_filename and self.upload_filename.lower().endswith(".json")): - try: - self.upload.seek(0) - data = json.load(self.upload) - logger.debug(f"Successfully read uploaded JSON file: {self.upload_filename}") - if isinstance(data, list): - return data - if isinstance(data, dict): - return [data] - logger.error(f"Uploaded JSON file '{self.upload_filename}' does not contain a list or dict.") - raise RuntimeError( - f"Uploaded JSON file '{self.upload_filename}' does not contain a list or dict." - ) - except Exception as e: - logger.error(f"Failed to read uploaded JSON '{self.upload_filename}': {e}") - raise RuntimeError(f"Failed to read uploaded JSON '{self.upload_filename}': {e}") - else: - logger.error(f"Unsupported uploaded file extension for '{self.upload_filename}'. Only .csv and .json are supported.") - raise ValueError( - f"Unsupported uploaded file extension for '{self.upload_filename}'. " - "Only .csv and .json are supported." - ) - # Fallback to path-based loading - p = (self.path or "").lower() - logger.info(f"Attempting to fetch data from file: {self.path}") - if p.endswith(".csv"): - try: - df = pd.read_csv(self.path) - logger.debug(f"Successfully read CSV file: {self.path}") - return df.to_dict(orient="records") - except Exception as e: - logger.error(f"Failed to read CSV '{self.path}': {e}") - raise RuntimeError(f"Failed to read CSV '{self.path}': {e}") - if p.endswith(".json"): - try: - with open(self.path, "r", encoding="utf-8") as f: - data = json.load(f) - logger.debug(f"Successfully read JSON file: {self.path}") - if isinstance(data, list): - return data - if isinstance(data, dict): - return [data] - logger.error(f"JSON file '{self.path}' does not contain a list or dict.") - raise RuntimeError( - f"JSON file '{self.path}' does not contain a list or dict." - ) - except Exception as e: - logger.error(f"Failed to read JSON '{self.path}': {e}") - raise RuntimeError(f"Failed to read JSON '{self.path}': {e}") - logger.error(f"Unsupported file extension for '{self.path}'. Only .csv and .json are supported.") - raise ValueError( - f"Unsupported file extension for '{self.path}'. " - "Only .csv and .json are supported." - ) \ No newline at end of file + records: list[AdapterRecord] = [] + + if not self.upload.filename: + raise ValueError("File name is required") + + filetype = self.upload.filename.split(".")[-1].lower() + + if filetype == "csv": + df = pd.read_csv(self.upload.file) + elif filetype == "json": + df = pd.read_json(self.upload.file) + else: + raise ValueError(f"Unsupported file type: {filetype}") + + for _, row in df.iterrows(): + record_data = row.to_dict() + record = AdapterRecord(source=self.upload.filename, data=record_data) + records.append(record) + + return records diff --git a/pipeline/tests/test_file_adapter.py b/pipeline/tests/test_file_adapter.py new file mode 100644 index 0000000..c38e5f9 --- /dev/null +++ b/pipeline/tests/test_file_adapter.py @@ -0,0 +1,52 @@ +import io +import pytest +from fastapi import UploadFile +from ingestion.adapters.file_adapter import FileAdapter + + +def make_upload_file(content: str, filename: str) -> UploadFile: + return UploadFile( + filename=filename, + file=io.BytesIO(content.encode("utf-8")), + ) + + +def test_file_adapter_csv(): + csv_content = "id,name,price\n001,Apple,12\n002,Orange,10\n003,Banana,8" + upload = make_upload_file(csv_content, "test.csv") + adapter = FileAdapter(upload) + records = adapter.fetch() + + assert len(records) == 3 + assert records[0].data["name"] == "Apple" + assert records[1].data["price"] == 10 + assert records[2].data["id"] == 3 + + +def test_file_adapter_json(): + json_content = """ + [{"id": "001", "name": "Apple", "price": 12}, + {"id": "002", "name": "Orange", "price": 10}, + {"id": "003", "name": "Banana", "price": 8}] + """ + upload = make_upload_file(json_content, "test.json") + adapter = FileAdapter(upload) + records = adapter.fetch() + + assert len(records) == 3 + assert records[0].data["name"] == "Apple" + assert records[1].data["price"] == 10 + assert records[2].data["id"] == 3 + + +def test_file_adapter_missing_filename(): + upload = UploadFile( + filename="", + file=io.BytesIO("id,name,price\n001,Apple,12".encode("utf-8")), + ) + adapter = FileAdapter(upload) + + with pytest.raises(ValueError) as excinfo: + adapter.fetch() + + assert "File name is required" in str(excinfo.value)