From 04b39ed883b9b89ec8ec7cc8e357049674d0e483 Mon Sep 17 00:00:00 2001 From: Sosokker Date: Wed, 29 Oct 2025 16:42:07 +0700 Subject: [PATCH] scrape articles concurrently and add articles registry --- linetoday/cli.py | 78 +++++++++++++++++++++++++++++++++++------ linetoday/registry.py | 49 ++++++++++++++++++++++++++ tests/test_extractor.py | 4 +++ tests/test_registry.py | 20 +++++++++++ 4 files changed, 140 insertions(+), 11 deletions(-) create mode 100644 linetoday/registry.py create mode 100644 tests/test_registry.py diff --git a/linetoday/cli.py b/linetoday/cli.py index b46ebaf..9742531 100644 --- a/linetoday/cli.py +++ b/linetoday/cli.py @@ -9,6 +9,7 @@ from .extractor import extract_article from .fetcher import Fetcher from .frontier import Frontier from .metrics import MetricsRecorder +from .registry import ArticleRegistry from .robots import RobotsManager from .storage import store_parsed, store_snapshot @@ -27,6 +28,18 @@ def build_parser() -> argparse.ArgumentParser: parser.add_argument('--per-origin-capacity', type=int, default=2, help='Burst size per origin token bucket') parser.add_argument('--per-origin-refill', type=float, default=2.0, help='Seconds per token refill for origin bucket') parser.add_argument('--timeout', type=float, default=20.0, help='HTTP request timeout') + parser.add_argument( + '--sections', + action='append', + default=[], + help='Seed from specific LINE TODAY sections (comma-separated or repeatable)', + ) + parser.add_argument( + '--seed-article', + action='append', + default=[], + help='Seed crawler with specific article URLs or IDs (repeatable)', + ) return parser @@ -47,9 +60,33 @@ async def crawl(args) -> None: metrics = MetricsRecorder() queue: asyncio.Queue[str | None] = asyncio.Queue() - if frontier.add(START_URL): - queue.put_nowait(START_URL) - metrics.inc('frontier_seeded') + registry = ArticleRegistry() + logging.info('loaded %s processed article(s) from registry', registry.size()) + + seeds: set[str] = set() + sections: list[str] = [] + for entry in args.sections: + sections.extend([part.strip() for part in entry.split(',') if part.strip()]) + + if sections: + for section in sections: + seeds.add(f'https://today.line.me/th/v3/page/{section.lstrip("/")}') + else: + seeds.add(START_URL) + + for entry in args.seed_article: + val = entry.strip() + if not val: + continue + if val.startswith('http'): + seeds.add(val) + else: + seeds.add(f'https://today.line.me/th/v3/article/{val}') + + for seed in seeds: + if frontier.add(seed): + queue.put_nowait(seed) + metrics.inc('frontier_seeded') fetched = 0 fetched_lock = asyncio.Lock() @@ -85,6 +122,12 @@ async def crawl(args) -> None: queue.task_done() continue + is_article_url = '/article/' in url + if is_article_url and registry.contains(url): + metrics.inc('article_skipped_processed') + queue.task_done() + continue + path = url[len(ORIGIN):] if not args.ignore_robots: allowed = await asyncio.to_thread(robots.allowed, ORIGIN, path) @@ -123,24 +166,38 @@ async def crawl(args) -> None: logger.warning('skipping %s status %s', url, status) else: html = res.get('text', '') - await asyncio.to_thread(store_snapshot, url, html) - metrics.inc('snapshots_written') - article, links = extract_article(html, url) - if not args.dry_run: - await asyncio.to_thread(store_parsed, article) - metrics.inc('parsed_written') + + is_article = bool(article.get('is_article')) + if is_article: + await asyncio.to_thread(store_snapshot, url, html) + metrics.inc('snapshots_written') + if not args.dry_run: + await asyncio.to_thread(store_parsed, article) + metrics.inc('parsed_written') + canonical = article.get('url') or url + await registry.mark(canonical) + metrics.inc('article_recorded') + metrics.inc('articles_fetched') + counted = True + else: + metrics.inc('non_article_fetched') if not stop_event.is_set(): new_links = 0 for link in links: if link.startswith('/'): link = ORIGIN + link + link_is_article = '/article/' in link + if link_is_article and registry.contains(link): + metrics.inc('article_link_skipped_processed') + continue if frontier.add(link): queue.put_nowait(link) new_links += 1 metrics.inc('links_enqueued', new_links) - counted = True + if is_article: + metrics.inc('article_links_enqueued', new_links) if counted: async with fetched_lock: @@ -195,4 +252,3 @@ def main(): if __name__ == '__main__': main() - diff --git a/linetoday/registry.py b/linetoday/registry.py new file mode 100644 index 0000000..4c30bc6 --- /dev/null +++ b/linetoday/registry.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import Iterable + +from .frontier import normalize_url +from .storage import STORAGE_DIR + +REGISTRY_PATH = STORAGE_DIR / 'meta' / 'processed_articles.txt' + + +class ArticleRegistry: + """Keeps track of processed article URLs across crawler runs.""" + + def __init__(self, path: Path | None = None, preload: Iterable[str] | None = None): + self._path = path or REGISTRY_PATH + self._path.parent.mkdir(parents=True, exist_ok=True) + self._lock = asyncio.Lock() + self._processed: set[str] = set() + + if self._path.exists(): + for line in self._path.read_text(encoding='utf-8').splitlines(): + line = line.strip() + if line: + self._processed.add(normalize_url(line)) + + if preload: + for item in preload: + self._processed.add(normalize_url(item)) + + def contains(self, url: str) -> bool: + return normalize_url(url) in self._processed + + async def mark(self, url: str) -> None: + norm = normalize_url(url) + async with self._lock: + if norm in self._processed: + return + self._processed.add(norm) + await asyncio.to_thread(self._append_line, url) + + def _append_line(self, url: str) -> None: + with self._path.open('a', encoding='utf-8') as fh: + fh.write(normalize_url(url)) + fh.write('\n') + + def size(self) -> int: + return len(self._processed) diff --git a/tests/test_extractor.py b/tests/test_extractor.py index 271edc8..bb87632 100644 --- a/tests/test_extractor.py +++ b/tests/test_extractor.py @@ -1,5 +1,7 @@ from pathlib import Path +import pytest + from linetoday.extractor import extract_article @@ -23,6 +25,8 @@ def test_article_fixture_uses_structured_payload(): def test_snapshots_are_classified_and_sanitised(): # iterate all html snapshots in data/snapshots + if not SNAPSHOT_DIR.exists(): + pytest.skip('No snapshot fixtures available') files = sorted([p for p in SNAPSHOT_DIR.iterdir() if p.suffix == '.html']) assert files, f'No snapshot files found in {SNAPSHOT_DIR}' diff --git a/tests/test_registry.py b/tests/test_registry.py new file mode 100644 index 0000000..a6e6448 --- /dev/null +++ b/tests/test_registry.py @@ -0,0 +1,20 @@ +import asyncio +from pathlib import Path + +from linetoday.registry import ArticleRegistry + + +def test_registry_persist_roundtrip(tmp_path: Path): + registry_path = tmp_path / 'processed.txt' + url = 'https://today.line.me/th/v3/article/test123' + + registry = ArticleRegistry(path=registry_path) + assert not registry.contains(url) + + asyncio.run(registry.mark(url)) + assert registry.contains(url) + + # Reload and ensure persistence + registry2 = ArticleRegistry(path=registry_path) + assert registry2.contains(url) + assert registry2.size() == 1