refactor: file adaptor now recive only UploadFile from user

This commit is contained in:
Sosokker 2025-05-12 15:11:15 +07:00
parent 6b445be016
commit 6a5fdf2c09
2 changed files with 84 additions and 84 deletions

View File

@ -2,13 +2,12 @@
File adapter to load data from CSV or JSON files.
"""
from typing import List, Dict, Any
import json
import pandas as pd
from loguru import logger
from fastapi import UploadFile
from .base import DataSourceAdapter
from models.adapters import AdapterRecord
class FileAdapter(DataSourceAdapter):
@ -16,97 +15,46 @@ class FileAdapter(DataSourceAdapter):
Adapter for reading data from local files (CSV or JSON), or from uploaded file-like objects.
"""
def __init__(self, path: str = None, format: str = None, upload=None, upload_filename: str = None):
def __init__(
self,
upload: UploadFile,
):
"""
Initialize the file adapter.
Args:
path: Path to the input file (.csv or .json), optional if upload is provided.
format: Optional file format (e.g., 'csv', 'json').
upload: Optional file-like object (e.g., from upload).
upload_filename: Optional original filename for validation/logging.
upload: File uploaded from user.
"""
self.path = path
self.format = format
self.upload = upload
self.upload_filename = upload_filename
logger.info(f"Initialized FileAdapter for path: {path}, upload: {upload_filename}, format: {format}")
logger.info(
f"Initialized FileAdapter for upload: {upload.filename}, format: {upload.content_type}"
)
def fetch(self) -> List[Dict[str, Any]]:
def fetch(self) -> list[AdapterRecord]:
"""
Read and parse the file, returning a list of records.
Supports both path-based and uploaded file-like inputs.
Returns:
List of dicts from the file contents.
Raises:
RuntimeError: On read or parse errors.
ValueError: If file extension is unsupported.
List of AdapterRecord objects.
"""
if self.upload is not None:
# Handle uploaded file-like object
logger.info(f"Fetching data from uploaded file: {self.upload_filename or '[no filename]'}")
if self.format == "csv" or (self.upload_filename and self.upload_filename.lower().endswith(".csv")):
try:
self.upload.seek(0)
df = pd.read_csv(self.upload)
logger.debug(f"Successfully read uploaded CSV file: {self.upload_filename}")
return df.to_dict(orient="records")
except Exception as e:
logger.error(f"Failed to read uploaded CSV '{self.upload_filename}': {e}")
raise RuntimeError(f"Failed to read uploaded CSV '{self.upload_filename}': {e}")
elif self.format == "json" or (self.upload_filename and self.upload_filename.lower().endswith(".json")):
try:
self.upload.seek(0)
data = json.load(self.upload)
logger.debug(f"Successfully read uploaded JSON file: {self.upload_filename}")
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
logger.error(f"Uploaded JSON file '{self.upload_filename}' does not contain a list or dict.")
raise RuntimeError(
f"Uploaded JSON file '{self.upload_filename}' does not contain a list or dict."
)
except Exception as e:
logger.error(f"Failed to read uploaded JSON '{self.upload_filename}': {e}")
raise RuntimeError(f"Failed to read uploaded JSON '{self.upload_filename}': {e}")
else:
logger.error(f"Unsupported uploaded file extension for '{self.upload_filename}'. Only .csv and .json are supported.")
raise ValueError(
f"Unsupported uploaded file extension for '{self.upload_filename}'. "
"Only .csv and .json are supported."
)
# Fallback to path-based loading
p = (self.path or "").lower()
logger.info(f"Attempting to fetch data from file: {self.path}")
if p.endswith(".csv"):
try:
df = pd.read_csv(self.path)
logger.debug(f"Successfully read CSV file: {self.path}")
return df.to_dict(orient="records")
except Exception as e:
logger.error(f"Failed to read CSV '{self.path}': {e}")
raise RuntimeError(f"Failed to read CSV '{self.path}': {e}")
if p.endswith(".json"):
try:
with open(self.path, "r", encoding="utf-8") as f:
data = json.load(f)
logger.debug(f"Successfully read JSON file: {self.path}")
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
logger.error(f"JSON file '{self.path}' does not contain a list or dict.")
raise RuntimeError(
f"JSON file '{self.path}' does not contain a list or dict."
)
except Exception as e:
logger.error(f"Failed to read JSON '{self.path}': {e}")
raise RuntimeError(f"Failed to read JSON '{self.path}': {e}")
logger.error(f"Unsupported file extension for '{self.path}'. Only .csv and .json are supported.")
raise ValueError(
f"Unsupported file extension for '{self.path}'. "
"Only .csv and .json are supported."
)
records: list[AdapterRecord] = []
if not self.upload.filename:
raise ValueError("File name is required")
filetype = self.upload.filename.split(".")[-1].lower()
if filetype == "csv":
df = pd.read_csv(self.upload.file)
elif filetype == "json":
df = pd.read_json(self.upload.file)
else:
raise ValueError(f"Unsupported file type: {filetype}")
for _, row in df.iterrows():
record_data = row.to_dict()
record = AdapterRecord(source=self.upload.filename, data=record_data)
records.append(record)
return records

View File

@ -0,0 +1,52 @@
import io
import pytest
from fastapi import UploadFile
from ingestion.adapters.file_adapter import FileAdapter
def make_upload_file(content: str, filename: str) -> UploadFile:
return UploadFile(
filename=filename,
file=io.BytesIO(content.encode("utf-8")),
)
def test_file_adapter_csv():
csv_content = "id,name,price\n001,Apple,12\n002,Orange,10\n003,Banana,8"
upload = make_upload_file(csv_content, "test.csv")
adapter = FileAdapter(upload)
records = adapter.fetch()
assert len(records) == 3
assert records[0].data["name"] == "Apple"
assert records[1].data["price"] == 10
assert records[2].data["id"] == 3
def test_file_adapter_json():
json_content = """
[{"id": "001", "name": "Apple", "price": 12},
{"id": "002", "name": "Orange", "price": 10},
{"id": "003", "name": "Banana", "price": 8}]
"""
upload = make_upload_file(json_content, "test.json")
adapter = FileAdapter(upload)
records = adapter.fetch()
assert len(records) == 3
assert records[0].data["name"] == "Apple"
assert records[1].data["price"] == 10
assert records[2].data["id"] == 3
def test_file_adapter_missing_filename():
upload = UploadFile(
filename="",
file=io.BytesIO("id,name,price\n001,Apple,12".encode("utf-8")),
)
adapter = FileAdapter(upload)
with pytest.raises(ValueError) as excinfo:
adapter.fetch()
assert "File name is required" in str(excinfo.value)