diff --git a/pipeline/normalization/__init__.py b/pipeline/normalization/__init__.py deleted file mode 100644 index ed14d08..0000000 --- a/pipeline/normalization/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -""" -Normalization package for data integration service. - -Provides utilities and classes to normalize raw records -into a canonical schema. -""" \ No newline at end of file diff --git a/pipeline/normalization/base.py b/pipeline/normalization/base.py deleted file mode 100644 index f61d40b..0000000 --- a/pipeline/normalization/base.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -Base module defining protocols for the normalization layer. -""" - -from typing import Protocol, Dict, Any - - -class TextExtractor(Protocol): - """ - Protocol for text extraction strategies. - """ - - def extract(self, record: Dict[str, Any]) -> str: - """ - Extract and return text from a flattened record. - - Args: - record: A flattened record dict. - - Returns: - A string containing the extracted text. - """ - ... \ No newline at end of file diff --git a/pipeline/normalization/normalizer.py b/pipeline/normalization/normalizer.py deleted file mode 100644 index 4b1a023..0000000 --- a/pipeline/normalization/normalizer.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -Normalizer module to transform raw records into a canonical schema. -""" - -from typing import List, Dict, Any, Optional - -from .base import TextExtractor -from .utils import flatten_dict, generate_id, extract_all_text - - -class _DefaultTextExtractor: - """ - Default text extractor using the extract_all_text utility. - """ - - def extract(self, record: Dict[str, Any]) -> str: - """ - Extract text from the record. - - Args: - record: A flattened record dict. - - Returns: - A string containing concatenated text values. - """ - return extract_all_text(record) - - -class Normalizer: - """ - Class to normalize raw records into a canonical format. - """ - - def __init__(self, extractor: Optional[TextExtractor] = None): - """ - Initialize the Normalizer. - - Args: - extractor: Optional custom TextExtractor strategy. - """ - self.extractor: TextExtractor = extractor or _DefaultTextExtractor() - - def normalize( - self, - records: List[Dict[str, Any]], - source_type: str, - source: str - ) -> List[Dict[str, Any]]: - """ - Normalize a list of raw records. - - Args: - records: Raw records to normalize. - source_type: Type of the source ('api', 'scrape', 'file'). - source: Original source identifier (URL or path). - - Returns: - A list of canonical records matching the schema. - """ - normalized: List[Dict[str, Any]] = [] - - for raw in records: - flat = flatten_dict(raw) - text = self.extractor.extract(flat) - rec_id = generate_id(source, flat) - metadata = {k: v for k, v in flat.items() if not isinstance(v, str)} - - canonical = { - "id": rec_id, - "source_type": source_type, - "source": source, - "raw": raw, - "metadata": metadata, - "text": text, - } - normalized.append(canonical) - - return normalized - - -if __name__ == "__main__": - # Example usage - sample = [{"title": "Hello", "details": {"body": "World", "count": 5}}] - norm = Normalizer() - records = norm.normalize(sample, source_type="api", source="https://example.com") - print(records) \ No newline at end of file diff --git a/pipeline/normalization/utils.py b/pipeline/normalization/utils.py deleted file mode 100644 index 72663ea..0000000 --- a/pipeline/normalization/utils.py +++ /dev/null @@ -1,64 +0,0 @@ -""" -Utility functions for the normalization layer. -""" - -import json -import uuid -from typing import Dict, Any - - -def flatten_dict( - d: Dict[str, Any], - parent_key: str = "", - sep: str = "." -) -> Dict[str, Any]: - """ - Recursively flatten a nested dictionary. - - Args: - d: The dictionary to flatten. - parent_key: The base key string for recursion. - sep: Separator between keys. - - Returns: - A flattened dictionary with compound keys. - """ - items: Dict[str, Any] = {} - for k, v in d.items(): - new_key = f"{parent_key}{sep}{k}" if parent_key else k - if isinstance(v, dict): - items.update(flatten_dict(v, new_key, sep=sep)) - else: - items[new_key] = v - return items - - -def generate_id(source: str, record: Dict[str, Any]) -> str: - """ - Generate a stable UUID based on source and record content. - - Args: - source: Identifier for the data source (URL or file path). - record: The flattened record dict. - - Returns: - A string representation of a UUID. - """ - record_json = json.dumps(record, sort_keys=True) - namespace = uuid.NAMESPACE_URL - uid = uuid.uuid5(namespace, f"{source}-{record_json}") - return str(uid) - - -def extract_all_text(record: Dict[str, Any]) -> str: - """ - Extract all string values from the record and concatenate them. - - Args: - record: A flattened record dict. - - Returns: - A single string containing all text values separated by spaces. - """ - texts = [v for v in record.values() if isinstance(v, str)] - return " ".join(texts) \ No newline at end of file