384 lines
15 KiB
Python
384 lines
15 KiB
Python
import re
|
|
import html
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, List, Tuple, Optional, Any, Set
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
def _get_meta(soup: BeautifulSoup, prop: str) -> Optional[str]:
|
|
tag = soup.find('meta', attrs={"property": prop}) or soup.find('meta', attrs={"name": prop})
|
|
if tag and tag.get('content'):
|
|
return tag.get('content').strip()
|
|
return None
|
|
|
|
|
|
def _parse_json_ld(soup: BeautifulSoup) -> Optional[dict]:
|
|
for tag in soup.find_all('script', type='application/ld+json'):
|
|
try:
|
|
raw = tag.string
|
|
if not raw:
|
|
continue
|
|
data = json.loads(raw.strip())
|
|
# if list, prefer first NewsArticle
|
|
if isinstance(data, list):
|
|
for item in data:
|
|
if isinstance(item, dict) and item.get('@type') in ('NewsArticle', 'Article'):
|
|
return item
|
|
if isinstance(data, dict) and data.get('@type') in ('NewsArticle', 'Article'):
|
|
return data
|
|
except Exception:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _parse_next_data(soup: BeautifulSoup) -> Optional[dict]:
|
|
tag = soup.find('script', attrs={'id': '__NEXT_DATA__', 'type': 'application/json'})
|
|
if not tag:
|
|
# sometimes it's without type
|
|
tag = soup.find('script', attrs={'id': '__NEXT_DATA__'})
|
|
if not tag:
|
|
return None
|
|
try:
|
|
raw = tag.string or tag.get_text()
|
|
if not raw:
|
|
return None
|
|
raw = raw.strip()
|
|
# unescape HTML entities that sometimes wrap the JSON
|
|
raw = html.unescape(raw)
|
|
try:
|
|
data = json.loads(raw)
|
|
return data
|
|
except Exception:
|
|
# sometimes the JSON is embedded or has prefix/suffix; try to extract between first and last brace
|
|
first = raw.find('{')
|
|
last = raw.rfind('}')
|
|
if first != -1 and last != -1 and last > first:
|
|
snippet = raw[first:last+1]
|
|
try:
|
|
return json.loads(snippet)
|
|
except Exception:
|
|
return None
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _find_field(obj: Any, keys: List[str]) -> Optional[Any]:
|
|
"""Recursively search dict/list for first occurrence of any key in keys."""
|
|
if isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
if k in keys:
|
|
return v
|
|
found = _find_field(v, keys)
|
|
if found is not None:
|
|
return found
|
|
elif isinstance(obj, list):
|
|
for item in obj:
|
|
found = _find_field(item, keys)
|
|
if found is not None:
|
|
return found
|
|
return None
|
|
|
|
|
|
def _extract_article_payload(nextdata: dict) -> Tuple[Optional[dict], Optional[str]]:
|
|
"""Locate the structured article payload inside Next.js fallback data."""
|
|
props = nextdata.get('props') if isinstance(nextdata, dict) else None
|
|
page_props = props.get('pageProps') if isinstance(props, dict) else None
|
|
fallback = page_props.get('fallback') if isinstance(page_props, dict) else None
|
|
if not isinstance(fallback, dict):
|
|
return None, None
|
|
|
|
for key, value in fallback.items():
|
|
if isinstance(value, dict):
|
|
data = value.get('data')
|
|
if isinstance(data, dict) and data.get('content') and data.get('title'):
|
|
return data, key
|
|
if value.get('content') and value.get('title'):
|
|
return value, key
|
|
return None, None
|
|
|
|
|
|
def _text_from_html_fragment(fragment: str) -> str:
|
|
# Use BeautifulSoup to cleanly extract text from HTML fragments
|
|
try:
|
|
frag_soup = BeautifulSoup(fragment, 'lxml')
|
|
return frag_soup.get_text(separator=' ', strip=True)
|
|
except Exception:
|
|
# fallback: strip tags crudely
|
|
return re.sub('<[^<]+?>', '', fragment).strip()
|
|
|
|
|
|
def _normalize_paragraphs(text: str) -> str:
|
|
"""Remove obvious noise (Loading..., duplicate nav labels) while preserving order."""
|
|
if not text:
|
|
return ''
|
|
cleaned: List[str] = []
|
|
seen_counts: Dict[str, int] = {}
|
|
for raw_line in text.splitlines():
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
lower = line.lower()
|
|
if 'loading' in lower:
|
|
# Catch "Loading...", "Loading... Loading..." etc.
|
|
continue
|
|
# Allow each distinct line up to twice to keep short quotes
|
|
if seen_counts.get(lower, 0) >= 2:
|
|
continue
|
|
seen_counts[lower] = seen_counts.get(lower, 0) + 1
|
|
cleaned.append(line)
|
|
return '\n\n'.join(cleaned)
|
|
|
|
|
|
def _is_today_article_url(url: str) -> bool:
|
|
parsed = urlparse(url)
|
|
if parsed.netloc and 'today.line.me' not in parsed.netloc:
|
|
return False
|
|
return '/article/' in parsed.path
|
|
|
|
|
|
def _collect_structured_links(article_payload: Optional[dict]) -> Set[str]:
|
|
links: Set[str] = set()
|
|
if not isinstance(article_payload, dict):
|
|
return links
|
|
|
|
explore = article_payload.get('exploreLinks')
|
|
if isinstance(explore, list):
|
|
for entry in explore:
|
|
page_link = entry.get('pageLink') if isinstance(entry, dict) else None
|
|
if not isinstance(page_link, dict):
|
|
continue
|
|
page_type = page_link.get('pageType')
|
|
if page_type == 'ARTICLE':
|
|
hash_val = page_link.get('hash')
|
|
if hash_val:
|
|
links.add(f'https://today.line.me/th/v3/article/{hash_val}')
|
|
elif page_type == 'GENERAL':
|
|
page = page_link.get('page')
|
|
if isinstance(page, dict):
|
|
url_path = page.get('urlPath')
|
|
if url_path:
|
|
links.add(f'https://today.line.me/th/v3/page/{url_path}')
|
|
elif page_type == 'TAG':
|
|
tag_hash = page_link.get('hash')
|
|
if tag_hash:
|
|
links.add(f'https://today.line.me/th/v3/tag/{tag_hash}')
|
|
return links
|
|
|
|
|
|
def extract_article(html: str, url: str) -> Tuple[Dict, List[str]]:
|
|
soup = BeautifulSoup(html, 'lxml')
|
|
|
|
# meta / og
|
|
title = _get_meta(soup, 'og:title') or _get_meta(soup, 'title')
|
|
description = _get_meta(soup, 'og:description') or _get_meta(soup, 'description')
|
|
image = _get_meta(soup, 'og:image')
|
|
published = _get_meta(soup, 'article:published_time')
|
|
|
|
# JSON-LD
|
|
jsonld = _parse_json_ld(soup)
|
|
if jsonld:
|
|
title = title or jsonld.get('headline')
|
|
if not published:
|
|
published = jsonld.get('datePublished') or jsonld.get('dateCreated')
|
|
author = None
|
|
a = jsonld.get('author')
|
|
if isinstance(a, dict):
|
|
author = a.get('name')
|
|
elif isinstance(a, list) and a:
|
|
author = a[0].get('name') if isinstance(a[0], dict) else None
|
|
else:
|
|
author = a
|
|
publisher = None
|
|
pub = jsonld.get('publisher')
|
|
if isinstance(pub, dict):
|
|
publisher = pub.get('name')
|
|
else:
|
|
author = None
|
|
publisher = None
|
|
|
|
body_html = None
|
|
content_type = None
|
|
source_url = None
|
|
category = None
|
|
tags: List[str] = []
|
|
is_article = False
|
|
|
|
# Try Next.js page data
|
|
nextdata = _parse_next_data(soup)
|
|
if nextdata:
|
|
payload, payload_key = _extract_article_payload(nextdata)
|
|
if payload:
|
|
content_type = payload.get('contentType')
|
|
url_info = payload.get('url') if isinstance(payload.get('url'), dict) else None
|
|
canonical_url = url_info.get('url') if isinstance(url_info, dict) else None
|
|
if canonical_url and _is_today_article_url(canonical_url):
|
|
is_article = True
|
|
url = canonical_url
|
|
elif canonical_url:
|
|
url = canonical_url
|
|
title = payload.get('title') or title
|
|
description = payload.get('shortDescription') or description
|
|
author = payload.get('author') or author
|
|
publisher = payload.get('publisher') or publisher
|
|
source_url = payload.get('sourceUrl')
|
|
category = payload.get('categoryName')
|
|
publish_unix = payload.get('publishTimeUnix')
|
|
if publish_unix and not published:
|
|
try:
|
|
published = datetime.fromtimestamp(publish_unix, tz=timezone.utc).isoformat()
|
|
except Exception:
|
|
published = payload.get('publishTime') or published
|
|
elif payload.get('publishTime') and not published:
|
|
published = payload.get('publishTime')
|
|
body_html = payload.get('content')
|
|
if body_html:
|
|
article_body = _text_from_html_fragment(body_html)
|
|
explore_links = payload.get('exploreLinks')
|
|
if isinstance(explore_links, list):
|
|
for entry in explore_links:
|
|
tag_name = entry.get('name') if isinstance(entry, dict) else None
|
|
page_link = entry.get('pageLink') if isinstance(entry, dict) else None
|
|
if tag_name and page_link and page_link.get('pageType') == 'TAG':
|
|
tags.append(tag_name)
|
|
else:
|
|
# search common fields used by news sites / Next.js props
|
|
nd_title = _find_field(nextdata, ['title', 'headline', 'name', 'seoTitle'])
|
|
if nd_title and not title:
|
|
title = nd_title
|
|
nd_desc = _find_field(nextdata, ['description', 'summary', 'seoDescription'])
|
|
if nd_desc and not description:
|
|
description = nd_desc
|
|
nd_body = _find_field(nextdata, ['articleBody', 'body', 'content', 'html'])
|
|
if nd_body:
|
|
if isinstance(nd_body, str):
|
|
article_body = _text_from_html_fragment(nd_body)
|
|
elif isinstance(nd_body, list):
|
|
parts = []
|
|
for item in nd_body:
|
|
if isinstance(item, str):
|
|
parts.append(_text_from_html_fragment(item))
|
|
elif isinstance(item, dict):
|
|
for k in ('text', 'content', 'body', 'html'):
|
|
if k in item and isinstance(item[k], str):
|
|
parts.append(_text_from_html_fragment(item[k]))
|
|
article_body = '\n\n'.join([p for p in parts if p])
|
|
elif isinstance(nd_body, dict):
|
|
if 'html' in nd_body and isinstance(nd_body['html'], str):
|
|
article_body = _text_from_html_fragment(nd_body['html'])
|
|
else:
|
|
article_body = _text_from_html_fragment(str(nd_body))
|
|
else:
|
|
article_body = ''
|
|
else:
|
|
article_body = ''
|
|
nd_img = _find_field(nextdata, ['image', 'thumbnail', 'ogImage'])
|
|
if nd_img and not image:
|
|
if isinstance(nd_img, str):
|
|
image = urljoin(url, nd_img)
|
|
elif isinstance(nd_img, dict):
|
|
candidate = nd_img.get('url') or nd_img.get('src') or nd_img.get('path')
|
|
if isinstance(candidate, str):
|
|
image = urljoin(url, candidate)
|
|
|
|
nd_author = _find_field(nextdata, ['author', 'writer', 'creator'])
|
|
if nd_author and not author:
|
|
if isinstance(nd_author, str):
|
|
author = nd_author
|
|
elif isinstance(nd_author, dict):
|
|
author = nd_author.get('name')
|
|
elif isinstance(nd_author, list) and nd_author:
|
|
first = nd_author[0]
|
|
if isinstance(first, dict):
|
|
author = first.get('name')
|
|
elif isinstance(first, str):
|
|
author = first
|
|
|
|
if not published:
|
|
nd_pub = _find_field(nextdata, ['datePublished', 'publishedAt', 'createdAt'])
|
|
if isinstance(nd_pub, str):
|
|
published = nd_pub
|
|
|
|
# If payload explicitly marks article content but lacks meta image, attempt to build from payload thumbnail
|
|
if not image and payload and isinstance(payload.get('thumbnail'), dict):
|
|
thumb = payload['thumbnail']
|
|
thumb_url = thumb.get('url') or thumb.get('src')
|
|
if isinstance(thumb_url, str):
|
|
image = urljoin(url, thumb_url)
|
|
|
|
else:
|
|
article_body = ''
|
|
|
|
# fallback title
|
|
if not title:
|
|
h1 = soup.find('h1')
|
|
if h1:
|
|
title = h1.get_text(strip=True)
|
|
|
|
# if article_body still empty, apply HTML heuristics
|
|
if not article_body:
|
|
# content extraction heuristics
|
|
# try common article containers
|
|
candidates = []
|
|
for sel in ['article', 'div[class*="article"]', 'div[itemprop="articleBody"]', 'div[class*="content"]']:
|
|
candidates.extend(soup.select(sel))
|
|
if not candidates:
|
|
# fallback to main
|
|
main = soup.find('main')
|
|
if main:
|
|
candidates = [main]
|
|
if candidates:
|
|
# Choose largest candidate by text length
|
|
best = max(candidates, key=lambda el: len(el.get_text(strip=True)))
|
|
# remove scripts, styles, ads-like nodes
|
|
for bad in best.select('script, style, .ad, .ads, .related, .promo'):
|
|
bad.decompose()
|
|
paragraphs = [p.get_text(separator=' ', strip=True) for p in best.find_all(['p', 'div']) if p.get_text(strip=True)]
|
|
article_body = '\n\n'.join(paragraphs)
|
|
else:
|
|
# as last resort, combine all <p>
|
|
paragraphs = [p.get_text(strip=True) for p in soup.find_all('p')]
|
|
article_body = '\n\n'.join(paragraphs)
|
|
|
|
article_body = _normalize_paragraphs(article_body)
|
|
|
|
# collect internal links with a bias toward article detail pages
|
|
link_candidates: Set[str] = set()
|
|
if nextdata:
|
|
payload, _ = _extract_article_payload(nextdata)
|
|
link_candidates.update(_collect_structured_links(payload))
|
|
|
|
for a in soup.find_all('a', href=True):
|
|
href = a['href']
|
|
absolute = urljoin(url, href)
|
|
if _is_today_article_url(absolute):
|
|
link_candidates.add(absolute)
|
|
elif absolute.startswith('https://today.line.me/th/'):
|
|
link_candidates.add(absolute)
|
|
|
|
if is_article:
|
|
link_candidates.add(url)
|
|
|
|
links = sorted(link_candidates)
|
|
|
|
article = {
|
|
'url': url,
|
|
'title': title,
|
|
'description': description,
|
|
'author': author,
|
|
'publisher': publisher,
|
|
'published_at': published,
|
|
'image': image,
|
|
'body_text': article_body,
|
|
'body_html': body_html,
|
|
'content_type': content_type,
|
|
'category': category,
|
|
'source_url': source_url,
|
|
'tags': tags,
|
|
'is_article': is_article,
|
|
}
|
|
return article, links
|