import argparse import asyncio import logging import signal import time from contextlib import suppress from .extractor import extract_article from .fetcher import Fetcher from .frontier import Frontier from .metrics import MetricsRecorder from .robots import RobotsManager from .storage import store_parsed, store_snapshot ORIGIN = 'https://today.line.me/th/v3/tab/wealth' START_URL = 'https://today.line.me/th/v3/tab/wealth' def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog='linetoday') parser.add_argument('--dry-run', action='store_true', help='Do not persist parsed data') parser.add_argument('--limit', type=int, default=10, help='Max pages to fetch') parser.add_argument('--ignore-robots', action='store_true', help='Ignore robots.txt checks (use only for local testing)') parser.add_argument('--verbose', action='store_true', help='Enable debug logging') parser.add_argument('--concurrency', type=int, default=4, help='Number of concurrent fetch workers') parser.add_argument('--metrics-interval', type=float, default=30.0, help='Seconds between metrics log snapshots') parser.add_argument('--per-origin-capacity', type=int, default=2, help='Burst size per origin token bucket') parser.add_argument('--per-origin-refill', type=float, default=2.0, help='Seconds per token refill for origin bucket') parser.add_argument('--timeout', type=float, default=20.0, help='HTTP request timeout') return parser async def metrics_reporter(metrics: MetricsRecorder, interval: float, stop_event: asyncio.Event) -> None: try: while not stop_event.is_set(): await asyncio.sleep(interval) logging.info('metrics %s', metrics.format_snapshot()) except asyncio.CancelledError: logging.info('metrics %s', metrics.format_snapshot()) raise async def crawl(args) -> None: ua = 'LineTodayCrawler/0.1 (+mailto:ops@example.com)' robots = RobotsManager(user_agent=ua) frontier = Frontier() metrics = MetricsRecorder() queue: asyncio.Queue[str | None] = asyncio.Queue() if frontier.add(START_URL): queue.put_nowait(START_URL) metrics.inc('frontier_seeded') fetched = 0 fetched_lock = asyncio.Lock() stop_event = asyncio.Event() async with Fetcher( user_agent=ua, max_retries=3, circuit_threshold=5, circuit_cooldown=60.0, timeout=args.timeout, per_origin_capacity=args.per_origin_capacity, per_origin_refill=args.per_origin_refill, ) as fetcher: reporter_task = asyncio.create_task(metrics_reporter(metrics, args.metrics_interval, stop_event)) async def worker(worker_id: int) -> None: nonlocal fetched logger = logging.getLogger(f'worker-{worker_id}') while True: url = await queue.get() if url is None: queue.task_done() logger.debug('received sentinel, exiting') break metrics.inc('queue_dequeued') if stop_event.is_set(): queue.task_done() continue if not url.startswith('https://today.line.me/th'): queue.task_done() continue path = url[len(ORIGIN):] if not args.ignore_robots: allowed = await asyncio.to_thread(robots.allowed, ORIGIN, path) if not allowed: logger.info('robots disallow %s', url) metrics.inc('robots_blocked') queue.task_done() continue logger.info('fetching %s', url) metrics.inc('fetch_started') fetch_start = time.perf_counter() res = await fetcher.fetch(url) latency_ms = (time.perf_counter() - fetch_start) * 1000 metrics.observe('fetch_latency_ms', latency_ms) status = res.get('status') if status is None: metrics.inc('status_none') logger.error('error fetching %s: %s', url, res.get('error')) if res.get('pause_seconds'): pause = res['pause_seconds'] metrics.inc('retry_pauses') logger.info('pausing for %s seconds', pause) await asyncio.sleep(pause) queue.task_done() continue metrics.inc(f'status_{status}') counted = False if status == 304: logger.info('not modified %s', url) counted = True elif status != 200: logger.warning('skipping %s status %s', url, status) else: html = res.get('text', '') await asyncio.to_thread(store_snapshot, url, html) metrics.inc('snapshots_written') article, links = extract_article(html, url) if not args.dry_run: await asyncio.to_thread(store_parsed, article) metrics.inc('parsed_written') if not stop_event.is_set(): new_links = 0 for link in links: if link.startswith('/'): link = ORIGIN + link if frontier.add(link): queue.put_nowait(link) new_links += 1 metrics.inc('links_enqueued', new_links) counted = True if counted: async with fetched_lock: fetched += 1 metrics.inc('fetched_total') if fetched >= args.limit and not stop_event.is_set(): logger.info('fetch limit reached (%s)', args.limit) stop_event.set() for _ in range(args.concurrency): queue.put_nowait(None) queue.task_done() workers = [asyncio.create_task(worker(i)) for i in range(args.concurrency)] def _request_stop(*_): if not stop_event.is_set(): logging.warning('received shutdown signal; draining queue') stop_event.set() for _ in range(args.concurrency): queue.put_nowait(None) loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): with suppress(NotImplementedError): loop.add_signal_handler(sig, _request_stop) await queue.join() if not stop_event.is_set(): stop_event.set() for _ in range(args.concurrency): queue.put_nowait(None) await asyncio.gather(*workers, return_exceptions=True) reporter_task.cancel() with suppress(asyncio.CancelledError): await reporter_task await asyncio.to_thread(robots._http.close) logging.info('done') def main(): parser = build_parser() args = parser.parse_args() level = logging.DEBUG if args.verbose else logging.INFO logging.basicConfig(level=level, format='%(asctime)s %(levelname)s %(name)s %(message)s') asyncio.run(crawl(args)) if __name__ == '__main__': main()