import asyncio import logging import random import time from collections import defaultdict from pathlib import Path from typing import Dict, Optional from urllib.parse import urlparse import json try: import httpx except Exception: # pragma: no cover httpx = None # type: ignore LOG = logging.getLogger(__name__) METADATA_DIR = Path.cwd() / 'data' / 'meta' METADATA_DIR.mkdir(parents=True, exist_ok=True) class AsyncTokenBucket: def __init__(self, capacity: int, refill_interval: float): self.capacity = capacity self.tokens = capacity self.refill_interval = refill_interval self.lock = asyncio.Lock() self.last_refill = time.monotonic() def _refill(self, now: float) -> None: elapsed = now - self.last_refill if elapsed <= 0: return add = int(elapsed / self.refill_interval) if add > 0: self.tokens = min(self.capacity, self.tokens + add) self.last_refill += add * self.refill_interval async def consume(self, timeout: Optional[float] = None) -> bool: deadline = None if timeout is None else time.monotonic() + timeout while True: async with self.lock: now = time.monotonic() self._refill(now) if self.tokens > 0: self.tokens -= 1 return True if deadline and time.monotonic() > deadline: return False await asyncio.sleep(0.05) class Fetcher: def __init__( self, user_agent: str = "LineTodayCrawler/0.1 (+mailto:ops@example.com)", max_retries: int = 3, circuit_threshold: int = 5, circuit_cooldown: float = 60.0, playwright_enabled: bool = False, timeout: float = 20.0, per_origin_capacity: int = 2, per_origin_refill: float = 2.0, ): if httpx is None: # pragma: no cover raise RuntimeError('httpx must be installed to use Fetcher') self.user_agent = user_agent self._client = httpx.AsyncClient(timeout=timeout, follow_redirects=True) self._bucket_capacity = max(1, per_origin_capacity) self._bucket_refill = max(0.1, per_origin_refill) self._buckets: Dict[str, AsyncTokenBucket] = defaultdict( lambda: AsyncTokenBucket(self._bucket_capacity, self._bucket_refill) ) self.max_retries = max_retries self._failures: Dict[str, int] = defaultdict(int) self._circuit_tripped_until: Dict[str, float] = {} self.circuit_threshold = circuit_threshold self.circuit_cooldown = circuit_cooldown self.playwright_enabled = playwright_enabled self._pw = None self._browser = None async def __aenter__(self) -> "Fetcher": return self async def __aexit__(self, exc_type, exc, tb) -> None: await self.close() async def close(self) -> None: try: if self._browser: try: await asyncio.to_thread(self._browser.close) except Exception: pass self._browser = None if self._pw: try: await asyncio.to_thread(self._pw.stop) except Exception: pass self._pw = None except Exception: LOG.exception('error while closing Playwright resources') await self._client.aclose() def _origin(self, url: str) -> str: p = urlparse(url) return f"{p.scheme}://{p.netloc}" async def _acquire(self, url: str) -> bool: origin = self._origin(url) until = self._circuit_tripped_until.get(origin) now = time.monotonic() if until and now < until: LOG.warning('circuit open for %s until %s', origin, until) return False bucket = self._buckets[origin] return await bucket.consume() def _meta_path(self, url: str) -> Path: name = str(abs(hash(url))) + '.json' return METADATA_DIR / name def _load_meta_sync(self, url: str) -> dict: p = self._meta_path(url) if p.exists(): try: return json.loads(p.read_text(encoding='utf-8')) except Exception: return {} return {} async def _load_meta(self, url: str) -> dict: return await asyncio.to_thread(self._load_meta_sync, url) def _save_meta_sync(self, url: str, meta: dict) -> None: p = self._meta_path(url) p.write_text(json.dumps(meta, ensure_ascii=False), encoding='utf-8') async def _save_meta(self, url: str, meta: dict) -> None: await asyncio.to_thread(self._save_meta_sync, url, meta) def _record_failure(self, origin: str) -> None: self._failures[origin] += 1 if self._failures[origin] >= self.circuit_threshold: until = time.monotonic() + self.circuit_cooldown self._circuit_tripped_until[origin] = until LOG.error('tripping circuit for %s until %.2f after %d failures', origin, until, self._failures[origin]) def _record_success(self, origin: str) -> None: self._failures[origin] = 0 if origin in self._circuit_tripped_until: del self._circuit_tripped_until[origin] async def fetch(self, url: str, render_js: bool = False) -> dict: origin = self._origin(url) ok = await self._acquire(url) if not ok: return {"url": url, "status": None, "error": "rate_limited_or_circuit_open"} headers = { "User-Agent": self.user_agent, "Accept-Language": "th,en;q=0.8", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", } meta = await self._load_meta(url) if meta.get('etag'): headers['If-None-Match'] = meta['etag'] if meta.get('last_modified'): headers['If-Modified-Since'] = meta['last_modified'] if render_js and self.playwright_enabled: LOG.warning('render_js mode is not supported in async fetcher yet; falling back to HTTP client for %s', url) attempt = 0 start = time.monotonic() while True: attempt += 1 try: LOG.debug('fetch attempt %d %s', attempt, url) resp = await self._client.get(url, headers=headers) duration = int((time.monotonic() - start) * 1000) status = resp.status_code out = { "url": url, "status": status, "headers": dict(resp.headers), "text": resp.text, "elapsed_ms": duration, } meta2 = {} if resp.headers.get('etag'): meta2['etag'] = resp.headers.get('etag') if resp.headers.get('last-modified'): meta2['last_modified'] = resp.headers.get('last-modified') if meta2: await self._save_meta(url, meta2) if 200 <= status < 400: self._record_success(origin) if status in (429, 503): ra = resp.headers.get('Retry-After') pause = None if ra: try: pause = int(ra) except ValueError: pause = None if pause and attempt <= self.max_retries: LOG.warning('honoring Retry-After=%s for %s', pause, url) out['pause_seconds'] = pause self._record_failure(origin) await asyncio.sleep(pause) continue if status == 429 or (500 <= status < 600): self._record_failure(origin) if attempt <= self.max_retries: backoff = (2 ** (attempt - 1)) + random.random() LOG.warning('transient status %s on %s; retrying in %.2fs (attempt %d)', status, url, backoff, attempt) await asyncio.sleep(backoff) continue LOG.error('giving up after %d attempts for %s status=%s', attempt, url, status) return out except httpx.RequestError as e: duration = int((time.monotonic() - start) * 1000) LOG.warning('request error on %s: %s', url, e) self._record_failure(origin) if attempt <= self.max_retries: backoff = (2 ** (attempt - 1)) + random.random() LOG.debug('retrying after error in %.2fs', backoff) await asyncio.sleep(backoff) continue return {"url": url, "status": None, "error": str(e), "elapsed_ms": duration} except Exception as e: # pragma: no cover - defensive duration = int((time.monotonic() - start) * 1000) LOG.exception('unexpected error fetching %s', url) self._record_failure(origin) return {"url": url, "status": None, "error": str(e), "elapsed_ms": duration}