From 186c85bfde80dc9ccc874825237a65afc7b2eea8 Mon Sep 17 00:00:00 2001 From: Sosokker Date: Mon, 12 May 2025 15:25:46 +0700 Subject: [PATCH] refactor: fix typehint --- pipeline/ingestion/adapters/api_adapter.py | 10 +++++----- pipeline/ingestion/adapters/base.py | 7 ++++--- pipeline/tests/test_api_adapter.py | 18 +++++++++--------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/pipeline/ingestion/adapters/api_adapter.py b/pipeline/ingestion/adapters/api_adapter.py index cced2d8..9514d90 100644 --- a/pipeline/ingestion/adapters/api_adapter.py +++ b/pipeline/ingestion/adapters/api_adapter.py @@ -2,12 +2,12 @@ API adapter to fetch JSON data from HTTP endpoints. """ -from typing import Any - import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry +from models.adapters import AdapterRecord + from .base import DataSourceAdapter from loguru import logger @@ -58,7 +58,7 @@ class ApiAdapter(DataSourceAdapter): logger.debug("HTTP session initialized with retry strategy.") return session - def fetch(self) -> list[dict[str, Any]]: + def fetch(self) -> list[AdapterRecord]: """ Perform a GET request and return JSON data as a list of records. @@ -87,8 +87,8 @@ class ApiAdapter(DataSourceAdapter): raise RuntimeError(f"Failed to parse JSON response: {e}") if isinstance(data, list): - return data + return [AdapterRecord(source=self.url, data=item) for item in data] if isinstance(data, dict): - return [data] + return [AdapterRecord(source=self.url, data=data)] logger.error("Unexpected JSON structure: expected list or dict.") raise RuntimeError("Unexpected JSON structure: expected list or dict.") diff --git a/pipeline/ingestion/adapters/base.py b/pipeline/ingestion/adapters/base.py index 57332f7..fbcdc6e 100644 --- a/pipeline/ingestion/adapters/base.py +++ b/pipeline/ingestion/adapters/base.py @@ -2,7 +2,8 @@ Define the DataSourceAdapter protocol for ingestion adapters. """ -from typing import Protocol, List, Dict, Any +from typing import Protocol +from models.adapters import AdapterRecord class DataSourceAdapter(Protocol): @@ -10,11 +11,11 @@ class DataSourceAdapter(Protocol): Protocol for data source adapters. """ - def fetch(self) -> List[Dict[str, Any]]: + def fetch(self) -> list[AdapterRecord]: """ Fetch data from the source. Returns: A list of records, each represented as a dict. """ - ... \ No newline at end of file + ... diff --git a/pipeline/tests/test_api_adapter.py b/pipeline/tests/test_api_adapter.py index a4fc65e..3fae2d3 100644 --- a/pipeline/tests/test_api_adapter.py +++ b/pipeline/tests/test_api_adapter.py @@ -8,14 +8,17 @@ import httpx def single_product(): return "https://dummyjson.com/products/1" + @pytest.fixture def multiple_product(): return "https://dummyjson.com/products" + @pytest.fixture def auth_endpoint(): return "https://dummyjson.com/auth/login" + @pytest.fixture def auth_require_endpoint(): return "https://dummyjson.com/auth/me" @@ -32,7 +35,7 @@ def test_fetch_dict_response(single_product): adapter_result = adapter.fetch() assert isinstance(adapter_result, list) - assert adapter_result[0] == expected_data + assert adapter_result[0].data == expected_data def test_fetch_list_response(multiple_product): @@ -44,7 +47,7 @@ def test_fetch_list_response(multiple_product): adapter = ApiAdapter(url=multiple_product) adapter_result = adapter.fetch() - assert adapter_result[0] == expected_data + assert adapter_result[0].data == expected_data @responses.activate @@ -60,6 +63,7 @@ def test_fetch_http_error(single_product): assert "API request failed" in str(exc_info.value) + @responses.activate def test_fetch_json_decode_error(single_product): """Test handling JSON decode errors.""" @@ -75,17 +79,13 @@ def test_fetch_json_decode_error(single_product): def test_token_header_injection(auth_endpoint, auth_require_endpoint): """Test that the token is injected into the Authorization header.""" - payload = { - "username": "emilys", - "password": "emilyspass", - "expiresInMins": 30 - } + payload = {"username": "emilys", "password": "emilyspass", "expiresInMins": 30} response = httpx.post( auth_endpoint, timeout=10, headers={"Content-Type": "application/json"}, - json=payload + json=payload, ) response.raise_for_status() @@ -98,7 +98,7 @@ def test_token_header_injection(auth_endpoint, auth_require_endpoint): adapter_result = adapter.fetch() assert isinstance(adapter_result, list) - assert adapter_result[0].get("username") == "emilys" + assert adapter_result[0].data.get("username") == "emilys" def test_custom_headers_are_used(single_product):