line-today-scrape/linetoday/fetcher.py
Sosokker 0b5b9d98c5
Some checks are pending
CI / test (push) Waiting to run
add main files
2025-10-29 16:12:55 +07:00

247 lines
9.2 KiB
Python

import asyncio
import logging
import random
import time
from collections import defaultdict
from pathlib import Path
from typing import Dict, Optional
from urllib.parse import urlparse
import json
try:
import httpx
except Exception: # pragma: no cover
httpx = None # type: ignore
LOG = logging.getLogger(__name__)
METADATA_DIR = Path.cwd() / 'data' / 'meta'
METADATA_DIR.mkdir(parents=True, exist_ok=True)
class AsyncTokenBucket:
def __init__(self, capacity: int, refill_interval: float):
self.capacity = capacity
self.tokens = capacity
self.refill_interval = refill_interval
self.lock = asyncio.Lock()
self.last_refill = time.monotonic()
def _refill(self, now: float) -> None:
elapsed = now - self.last_refill
if elapsed <= 0:
return
add = int(elapsed / self.refill_interval)
if add > 0:
self.tokens = min(self.capacity, self.tokens + add)
self.last_refill += add * self.refill_interval
async def consume(self, timeout: Optional[float] = None) -> bool:
deadline = None if timeout is None else time.monotonic() + timeout
while True:
async with self.lock:
now = time.monotonic()
self._refill(now)
if self.tokens > 0:
self.tokens -= 1
return True
if deadline and time.monotonic() > deadline:
return False
await asyncio.sleep(0.05)
class Fetcher:
def __init__(
self,
user_agent: str = "LineTodayCrawler/0.1 (+mailto:ops@example.com)",
max_retries: int = 3,
circuit_threshold: int = 5,
circuit_cooldown: float = 60.0,
playwright_enabled: bool = False,
timeout: float = 20.0,
per_origin_capacity: int = 2,
per_origin_refill: float = 2.0,
):
if httpx is None: # pragma: no cover
raise RuntimeError('httpx must be installed to use Fetcher')
self.user_agent = user_agent
self._client = httpx.AsyncClient(timeout=timeout, follow_redirects=True)
self._bucket_capacity = max(1, per_origin_capacity)
self._bucket_refill = max(0.1, per_origin_refill)
self._buckets: Dict[str, AsyncTokenBucket] = defaultdict(
lambda: AsyncTokenBucket(self._bucket_capacity, self._bucket_refill)
)
self.max_retries = max_retries
self._failures: Dict[str, int] = defaultdict(int)
self._circuit_tripped_until: Dict[str, float] = {}
self.circuit_threshold = circuit_threshold
self.circuit_cooldown = circuit_cooldown
self.playwright_enabled = playwright_enabled
self._pw = None
self._browser = None
async def __aenter__(self) -> "Fetcher":
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self.close()
async def close(self) -> None:
try:
if self._browser:
try:
await asyncio.to_thread(self._browser.close)
except Exception:
pass
self._browser = None
if self._pw:
try:
await asyncio.to_thread(self._pw.stop)
except Exception:
pass
self._pw = None
except Exception:
LOG.exception('error while closing Playwright resources')
await self._client.aclose()
def _origin(self, url: str) -> str:
p = urlparse(url)
return f"{p.scheme}://{p.netloc}"
async def _acquire(self, url: str) -> bool:
origin = self._origin(url)
until = self._circuit_tripped_until.get(origin)
now = time.monotonic()
if until and now < until:
LOG.warning('circuit open for %s until %s', origin, until)
return False
bucket = self._buckets[origin]
return await bucket.consume()
def _meta_path(self, url: str) -> Path:
name = str(abs(hash(url))) + '.json'
return METADATA_DIR / name
def _load_meta_sync(self, url: str) -> dict:
p = self._meta_path(url)
if p.exists():
try:
return json.loads(p.read_text(encoding='utf-8'))
except Exception:
return {}
return {}
async def _load_meta(self, url: str) -> dict:
return await asyncio.to_thread(self._load_meta_sync, url)
def _save_meta_sync(self, url: str, meta: dict) -> None:
p = self._meta_path(url)
p.write_text(json.dumps(meta, ensure_ascii=False), encoding='utf-8')
async def _save_meta(self, url: str, meta: dict) -> None:
await asyncio.to_thread(self._save_meta_sync, url, meta)
def _record_failure(self, origin: str) -> None:
self._failures[origin] += 1
if self._failures[origin] >= self.circuit_threshold:
until = time.monotonic() + self.circuit_cooldown
self._circuit_tripped_until[origin] = until
LOG.error('tripping circuit for %s until %.2f after %d failures', origin, until, self._failures[origin])
def _record_success(self, origin: str) -> None:
self._failures[origin] = 0
if origin in self._circuit_tripped_until:
del self._circuit_tripped_until[origin]
async def fetch(self, url: str, render_js: bool = False) -> dict:
origin = self._origin(url)
ok = await self._acquire(url)
if not ok:
return {"url": url, "status": None, "error": "rate_limited_or_circuit_open"}
headers = {
"User-Agent": self.user_agent,
"Accept-Language": "th,en;q=0.8",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
meta = await self._load_meta(url)
if meta.get('etag'):
headers['If-None-Match'] = meta['etag']
if meta.get('last_modified'):
headers['If-Modified-Since'] = meta['last_modified']
if render_js and self.playwright_enabled:
LOG.warning('render_js mode is not supported in async fetcher yet; falling back to HTTP client for %s', url)
attempt = 0
start = time.monotonic()
while True:
attempt += 1
try:
LOG.debug('fetch attempt %d %s', attempt, url)
resp = await self._client.get(url, headers=headers)
duration = int((time.monotonic() - start) * 1000)
status = resp.status_code
out = {
"url": url,
"status": status,
"headers": dict(resp.headers),
"text": resp.text,
"elapsed_ms": duration,
}
meta2 = {}
if resp.headers.get('etag'):
meta2['etag'] = resp.headers.get('etag')
if resp.headers.get('last-modified'):
meta2['last_modified'] = resp.headers.get('last-modified')
if meta2:
await self._save_meta(url, meta2)
if 200 <= status < 400:
self._record_success(origin)
if status in (429, 503):
ra = resp.headers.get('Retry-After')
pause = None
if ra:
try:
pause = int(ra)
except ValueError:
pause = None
if pause and attempt <= self.max_retries:
LOG.warning('honoring Retry-After=%s for %s', pause, url)
out['pause_seconds'] = pause
self._record_failure(origin)
await asyncio.sleep(pause)
continue
if status == 429 or (500 <= status < 600):
self._record_failure(origin)
if attempt <= self.max_retries:
backoff = (2 ** (attempt - 1)) + random.random()
LOG.warning('transient status %s on %s; retrying in %.2fs (attempt %d)', status, url, backoff, attempt)
await asyncio.sleep(backoff)
continue
LOG.error('giving up after %d attempts for %s status=%s', attempt, url, status)
return out
except httpx.RequestError as e:
duration = int((time.monotonic() - start) * 1000)
LOG.warning('request error on %s: %s', url, e)
self._record_failure(origin)
if attempt <= self.max_retries:
backoff = (2 ** (attempt - 1)) + random.random()
LOG.debug('retrying after error in %.2fs', backoff)
await asyncio.sleep(backoff)
continue
return {"url": url, "status": None, "error": str(e), "elapsed_ms": duration}
except Exception as e: # pragma: no cover - defensive
duration = int((time.monotonic() - start) * 1000)
LOG.exception('unexpected error fetching %s', url)
self._record_failure(origin)
return {"url": url, "status": None, "error": str(e), "elapsed_ms": duration}