line-today-scrape/linetoday/cli.py
Sosokker 04b39ed883
Some checks failed
CI / test (push) Has been cancelled
scrape articles concurrently and add articles registry
2025-10-29 16:42:07 +07:00

255 lines
9.6 KiB
Python

import argparse
import asyncio
import logging
import signal
import time
from contextlib import suppress
from .extractor import extract_article
from .fetcher import Fetcher
from .frontier import Frontier
from .metrics import MetricsRecorder
from .registry import ArticleRegistry
from .robots import RobotsManager
from .storage import store_parsed, store_snapshot
ORIGIN = 'https://today.line.me/th/v3/tab/wealth'
START_URL = 'https://today.line.me/th/v3/tab/wealth'
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog='linetoday')
parser.add_argument('--dry-run', action='store_true', help='Do not persist parsed data')
parser.add_argument('--limit', type=int, default=10, help='Max pages to fetch')
parser.add_argument('--ignore-robots', action='store_true', help='Ignore robots.txt checks (use only for local testing)')
parser.add_argument('--verbose', action='store_true', help='Enable debug logging')
parser.add_argument('--concurrency', type=int, default=4, help='Number of concurrent fetch workers')
parser.add_argument('--metrics-interval', type=float, default=30.0, help='Seconds between metrics log snapshots')
parser.add_argument('--per-origin-capacity', type=int, default=2, help='Burst size per origin token bucket')
parser.add_argument('--per-origin-refill', type=float, default=2.0, help='Seconds per token refill for origin bucket')
parser.add_argument('--timeout', type=float, default=20.0, help='HTTP request timeout')
parser.add_argument(
'--sections',
action='append',
default=[],
help='Seed from specific LINE TODAY sections (comma-separated or repeatable)',
)
parser.add_argument(
'--seed-article',
action='append',
default=[],
help='Seed crawler with specific article URLs or IDs (repeatable)',
)
return parser
async def metrics_reporter(metrics: MetricsRecorder, interval: float, stop_event: asyncio.Event) -> None:
try:
while not stop_event.is_set():
await asyncio.sleep(interval)
logging.info('metrics %s', metrics.format_snapshot())
except asyncio.CancelledError:
logging.info('metrics %s', metrics.format_snapshot())
raise
async def crawl(args) -> None:
ua = 'LineTodayCrawler/0.1 (+mailto:ops@example.com)'
robots = RobotsManager(user_agent=ua)
frontier = Frontier()
metrics = MetricsRecorder()
queue: asyncio.Queue[str | None] = asyncio.Queue()
registry = ArticleRegistry()
logging.info('loaded %s processed article(s) from registry', registry.size())
seeds: set[str] = set()
sections: list[str] = []
for entry in args.sections:
sections.extend([part.strip() for part in entry.split(',') if part.strip()])
if sections:
for section in sections:
seeds.add(f'https://today.line.me/th/v3/page/{section.lstrip("/")}')
else:
seeds.add(START_URL)
for entry in args.seed_article:
val = entry.strip()
if not val:
continue
if val.startswith('http'):
seeds.add(val)
else:
seeds.add(f'https://today.line.me/th/v3/article/{val}')
for seed in seeds:
if frontier.add(seed):
queue.put_nowait(seed)
metrics.inc('frontier_seeded')
fetched = 0
fetched_lock = asyncio.Lock()
stop_event = asyncio.Event()
async with Fetcher(
user_agent=ua,
max_retries=3,
circuit_threshold=5,
circuit_cooldown=60.0,
timeout=args.timeout,
per_origin_capacity=args.per_origin_capacity,
per_origin_refill=args.per_origin_refill,
) as fetcher:
reporter_task = asyncio.create_task(metrics_reporter(metrics, args.metrics_interval, stop_event))
async def worker(worker_id: int) -> None:
nonlocal fetched
logger = logging.getLogger(f'worker-{worker_id}')
while True:
url = await queue.get()
if url is None:
queue.task_done()
logger.debug('received sentinel, exiting')
break
metrics.inc('queue_dequeued')
if stop_event.is_set():
queue.task_done()
continue
if not url.startswith('https://today.line.me/th'):
queue.task_done()
continue
is_article_url = '/article/' in url
if is_article_url and registry.contains(url):
metrics.inc('article_skipped_processed')
queue.task_done()
continue
path = url[len(ORIGIN):]
if not args.ignore_robots:
allowed = await asyncio.to_thread(robots.allowed, ORIGIN, path)
if not allowed:
logger.info('robots disallow %s', url)
metrics.inc('robots_blocked')
queue.task_done()
continue
logger.info('fetching %s', url)
metrics.inc('fetch_started')
fetch_start = time.perf_counter()
res = await fetcher.fetch(url)
latency_ms = (time.perf_counter() - fetch_start) * 1000
metrics.observe('fetch_latency_ms', latency_ms)
status = res.get('status')
if status is None:
metrics.inc('status_none')
logger.error('error fetching %s: %s', url, res.get('error'))
if res.get('pause_seconds'):
pause = res['pause_seconds']
metrics.inc('retry_pauses')
logger.info('pausing for %s seconds', pause)
await asyncio.sleep(pause)
queue.task_done()
continue
metrics.inc(f'status_{status}')
counted = False
if status == 304:
logger.info('not modified %s', url)
counted = True
elif status != 200:
logger.warning('skipping %s status %s', url, status)
else:
html = res.get('text', '')
article, links = extract_article(html, url)
is_article = bool(article.get('is_article'))
if is_article:
await asyncio.to_thread(store_snapshot, url, html)
metrics.inc('snapshots_written')
if not args.dry_run:
await asyncio.to_thread(store_parsed, article)
metrics.inc('parsed_written')
canonical = article.get('url') or url
await registry.mark(canonical)
metrics.inc('article_recorded')
metrics.inc('articles_fetched')
counted = True
else:
metrics.inc('non_article_fetched')
if not stop_event.is_set():
new_links = 0
for link in links:
if link.startswith('/'):
link = ORIGIN + link
link_is_article = '/article/' in link
if link_is_article and registry.contains(link):
metrics.inc('article_link_skipped_processed')
continue
if frontier.add(link):
queue.put_nowait(link)
new_links += 1
metrics.inc('links_enqueued', new_links)
if is_article:
metrics.inc('article_links_enqueued', new_links)
if counted:
async with fetched_lock:
fetched += 1
metrics.inc('fetched_total')
if fetched >= args.limit and not stop_event.is_set():
logger.info('fetch limit reached (%s)', args.limit)
stop_event.set()
for _ in range(args.concurrency):
queue.put_nowait(None)
queue.task_done()
workers = [asyncio.create_task(worker(i)) for i in range(args.concurrency)]
def _request_stop(*_):
if not stop_event.is_set():
logging.warning('received shutdown signal; draining queue')
stop_event.set()
for _ in range(args.concurrency):
queue.put_nowait(None)
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
with suppress(NotImplementedError):
loop.add_signal_handler(sig, _request_stop)
await queue.join()
if not stop_event.is_set():
stop_event.set()
for _ in range(args.concurrency):
queue.put_nowait(None)
await asyncio.gather(*workers, return_exceptions=True)
reporter_task.cancel()
with suppress(asyncio.CancelledError):
await reporter_task
await asyncio.to_thread(robots._http.close)
logging.info('done')
def main():
parser = build_parser()
args = parser.parse_args()
level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=level, format='%(asctime)s %(levelname)s %(name)s %(message)s')
asyncio.run(crawl(args))
if __name__ == '__main__':
main()