feat: storage JSONL writer and dedup index; add parser tests
This commit is contained in:
parent
2dfe929b94
commit
7256e17552
59
internal/parser/parser_test.go
Normal file
59
internal/parser/parser_test.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
package parser
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/yourname/reddit-scraper/internal/fetcher"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseJSONPost_Valid(t *testing.T) {
|
||||||
|
j := fetcher.JSONPost{
|
||||||
|
ID: "abc123",
|
||||||
|
Subreddit: "golang",
|
||||||
|
Title: "An interesting post",
|
||||||
|
Author: "alice",
|
||||||
|
CreatedUTC: 1620000000,
|
||||||
|
Score: 10,
|
||||||
|
NumComments: 2,
|
||||||
|
Selftext: "Hello world",
|
||||||
|
URL: "https://reddit.com/r/golang/comments/abc123",
|
||||||
|
Permalink: "/r/golang/comments/abc123",
|
||||||
|
}
|
||||||
|
p, err := ParseJSONPost(j)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if p.ID != j.ID {
|
||||||
|
t.Fatalf("id mismatch: %s != %s", p.ID, j.ID)
|
||||||
|
}
|
||||||
|
if p.Author != "alice" {
|
||||||
|
t.Fatalf("author mismatch: %s", p.Author)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseJSONPost_DeletedAuthor(t *testing.T) {
|
||||||
|
j := fetcher.JSONPost{
|
||||||
|
ID: "d1",
|
||||||
|
Subreddit: "test",
|
||||||
|
Title: "post",
|
||||||
|
Author: "",
|
||||||
|
CreatedUTC: 1600000000,
|
||||||
|
}
|
||||||
|
p, err := ParseJSONPost(j)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if p.Author != "[deleted]" {
|
||||||
|
t.Fatalf("expected [deleted], got %s", p.Author)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseJSONPost_MissingFields(t *testing.T) {
|
||||||
|
j := fetcher.JSONPost{
|
||||||
|
ID: "",
|
||||||
|
}
|
||||||
|
_, err := ParseJSONPost(j)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected error for missing id")
|
||||||
|
}
|
||||||
|
}
|
||||||
76
internal/storage/dedup.go
Normal file
76
internal/storage/dedup.go
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DedupIndex keeps an in-memory set of seen IDs and persists them to an append-only file.
|
||||||
|
type DedupIndex struct {
|
||||||
|
path string
|
||||||
|
mu sync.Mutex
|
||||||
|
set map[string]struct{}
|
||||||
|
f *os.File
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadDedup loads existing IDs from path and prepares to append new ones.
|
||||||
|
func LoadDedup(path string) (*DedupIndex, error) {
|
||||||
|
idx := &DedupIndex{path: path, set: make(map[string]struct{})}
|
||||||
|
// Ensure file exists
|
||||||
|
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o644)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
idx.f = f
|
||||||
|
// Read existing lines
|
||||||
|
sc := bufio.NewScanner(f)
|
||||||
|
for sc.Scan() {
|
||||||
|
idx.set[sc.Text()] = struct{}{}
|
||||||
|
}
|
||||||
|
// ignore scanner error for now
|
||||||
|
return idx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seen reports whether id was seen before.
|
||||||
|
func (d *DedupIndex) Seen(id string) bool {
|
||||||
|
d.mu.Lock()
|
||||||
|
defer d.mu.Unlock()
|
||||||
|
_, ok := d.set[id]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add marks id as seen and appends to underlying file.
|
||||||
|
func (d *DedupIndex) Add(id string) error {
|
||||||
|
d.mu.Lock()
|
||||||
|
defer d.mu.Unlock()
|
||||||
|
if _, ok := d.set[id]; ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if d.f == nil {
|
||||||
|
// should not happen if loaded properly
|
||||||
|
f, err := os.OpenFile(d.path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o644)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
d.f = f
|
||||||
|
}
|
||||||
|
if _, err := d.f.WriteString(id + "\n"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := d.f.Sync(); err != nil {
|
||||||
|
// best-effort; ignore
|
||||||
|
}
|
||||||
|
d.set[id] = struct{}{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the underlying file handle.
|
||||||
|
func (d *DedupIndex) Close() error {
|
||||||
|
d.mu.Lock()
|
||||||
|
defer d.mu.Unlock()
|
||||||
|
if d.f != nil {
|
||||||
|
return d.f.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
81
internal/storage/jsonl_writer.go
Normal file
81
internal/storage/jsonl_writer.go
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// JSONLWriter writes records as line-delimited JSON and rotates files by size.
|
||||||
|
type JSONLWriter struct {
|
||||||
|
keyword string
|
||||||
|
outDir string
|
||||||
|
file *os.File
|
||||||
|
writer *bufio.Writer
|
||||||
|
maxBytes int64
|
||||||
|
written int64
|
||||||
|
part int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJSONLWriter creates a writer for a keyword in outDir. maxBytes triggers rotation.
|
||||||
|
func NewJSONLWriter(keyword, outDir string, maxBytes int64) (*JSONLWriter, error) {
|
||||||
|
w := &JSONLWriter{keyword: keyword, outDir: outDir, maxBytes: maxBytes}
|
||||||
|
if err := w.rotate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return w, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *JSONLWriter) rotate() error {
|
||||||
|
if w.writer != nil {
|
||||||
|
w.writer.Flush()
|
||||||
|
}
|
||||||
|
if w.file != nil {
|
||||||
|
w.file.Close()
|
||||||
|
}
|
||||||
|
w.part++
|
||||||
|
ts := time.Now().UTC().Format("20060102T150405Z")
|
||||||
|
name := fmt.Sprintf("%s_%s_part%d.jsonl", w.keyword, ts, w.part)
|
||||||
|
path := filepath.Join(w.outDir, name)
|
||||||
|
f, err := os.Create(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.file = f
|
||||||
|
w.writer = bufio.NewWriterSize(f, 16*1024)
|
||||||
|
w.written = 0
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write marshals v to JSON and writes it as a newline-delimited record.
|
||||||
|
func (w *JSONLWriter) Write(v interface{}) error {
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n, err := w.writer.Write(append(b, '\n'))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.written += int64(n)
|
||||||
|
if w.maxBytes > 0 && w.written >= w.maxBytes {
|
||||||
|
return w.rotate()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close flushes and closes the underlying file.
|
||||||
|
func (w *JSONLWriter) Close() error {
|
||||||
|
if w.writer != nil {
|
||||||
|
if err := w.writer.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if w.file != nil {
|
||||||
|
return w.file.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user