206 lines
5.0 KiB
Go
206 lines
5.0 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/yourname/reddit-scraper/internal/config"
|
|
"github.com/yourname/reddit-scraper/internal/fetcher"
|
|
"github.com/yourname/reddit-scraper/internal/parser"
|
|
"github.com/yourname/reddit-scraper/internal/storage"
|
|
"github.com/yourname/reddit-scraper/internal/types"
|
|
)
|
|
|
|
// Controller orchestrates fetching, parsing, deduplication, and storage.
|
|
type Controller struct {
|
|
cfg *config.Config
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// New creates a controller instance.
|
|
func New(cfg *config.Config, logger *slog.Logger) *Controller {
|
|
return &Controller{cfg: cfg, logger: logger}
|
|
}
|
|
|
|
// Run performs the scraping job until it collects cfg.Limit posts or context is done.
|
|
func (c *Controller) Run(ctx context.Context) (types.Summary, error) {
|
|
lg := c.logger
|
|
lg.Info("controller starting",
|
|
"keyword", c.cfg.Keyword,
|
|
"limit", c.cfg.Limit,
|
|
"concurrency", c.cfg.Concurrency,
|
|
)
|
|
|
|
// Setup components
|
|
client := fetcher.NewClient(c.cfg.UserAgent, 30*time.Second, c.cfg.RateLimitDelay, c.cfg.Concurrency, c.cfg.RetryLimit)
|
|
dedup, err := storage.LoadDedup(c.cfg.DedupCachePath)
|
|
if err != nil {
|
|
return types.Summary{}, fmt.Errorf("loading dedup: %w", err)
|
|
}
|
|
defer dedup.Close()
|
|
|
|
writer, err := storage.NewJSONLWriter(c.cfg.Keyword, c.cfg.OutputDir, 10*1024*1024)
|
|
if err != nil {
|
|
return types.Summary{}, fmt.Errorf("creating writer: %w", err)
|
|
}
|
|
defer writer.Close()
|
|
|
|
var mu sync.Mutex
|
|
summary := types.Summary{}
|
|
startTime := time.Now()
|
|
|
|
// Channels
|
|
tasks := make(chan fetchTask)
|
|
results := make(chan fetchResult)
|
|
|
|
// Worker pool
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < c.cfg.Concurrency; i++ {
|
|
wg.Add(1)
|
|
go func(workerID int) {
|
|
defer wg.Done()
|
|
for t := range tasks {
|
|
res, err := c.fetchWithRetries(ctx, client, t.query, t.after, t.limit)
|
|
if err != nil {
|
|
lg.Warn("worker fetch error", "worker", workerID, "err", err)
|
|
results <- fetchResult{err: err}
|
|
continue
|
|
}
|
|
results <- fetchResult{res: res}
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Controller goroutine to manage pages and collection
|
|
var collectWg sync.WaitGroup
|
|
collectWg.Add(1)
|
|
go func() {
|
|
defer collectWg.Done()
|
|
defer close(results)
|
|
|
|
collected := 0
|
|
after := ""
|
|
for collected < c.cfg.Limit {
|
|
select {
|
|
case <-ctx.Done():
|
|
lg.Info("context canceled, stopping page dispatch")
|
|
return
|
|
default:
|
|
}
|
|
|
|
// dispatch a page task
|
|
tasks <- fetchTask{query: c.cfg.Keyword, after: after, limit: min(100, c.cfg.Limit-collected)}
|
|
|
|
// wait for a result and process it
|
|
fr := <-results
|
|
if fr.err != nil {
|
|
// on error, back off a bit and retry dispatch
|
|
lg.Warn("fetch result error", "err", fr.err)
|
|
time.Sleep(2 * time.Second)
|
|
continue
|
|
}
|
|
|
|
// process posts
|
|
for _, jp := range fr.res.Posts {
|
|
p, perr := parser.ParseJSONPost(jp)
|
|
if perr != nil {
|
|
lg.Debug("skipping post", "reason", perr)
|
|
continue
|
|
}
|
|
if dedup.Seen(p.ID) {
|
|
mu.Lock()
|
|
summary.SkippedDuplicates++
|
|
mu.Unlock()
|
|
continue
|
|
}
|
|
if err := writer.Write(p); err != nil {
|
|
lg.Warn("write error", "err", err)
|
|
continue
|
|
}
|
|
if err := dedup.Add(p.ID); err != nil {
|
|
lg.Warn("dedup add error", "err", err)
|
|
}
|
|
mu.Lock()
|
|
summary.SuccessfulPosts++
|
|
mu.Unlock()
|
|
collected++
|
|
if collected >= c.cfg.Limit {
|
|
break
|
|
}
|
|
}
|
|
|
|
summary.TotalRequests++
|
|
if fr.res.After == "" {
|
|
// no more pages
|
|
lg.Info("no after token, finishing")
|
|
break
|
|
}
|
|
after = fr.res.After
|
|
}
|
|
}()
|
|
|
|
// Wait until pages are dispatched/processed
|
|
// Close tasks to stop workers when we're done dispatching
|
|
collectWg.Wait()
|
|
close(tasks)
|
|
wg.Wait()
|
|
|
|
// finalize
|
|
duration := time.Since(startTime)
|
|
summary.DurationSec = duration.Seconds()
|
|
if summary.SuccessfulPosts > 0 {
|
|
summary.AvgLatencyMs = float64(summary.DurationSec*1000) / float64(summary.SuccessfulPosts)
|
|
}
|
|
lg.Info("controller finished", "summary", summary)
|
|
return summary, nil
|
|
}
|
|
|
|
// helper types
|
|
type fetchTask struct {
|
|
query string
|
|
after string
|
|
limit int
|
|
}
|
|
|
|
type fetchResult struct {
|
|
res fetcher.PageResult
|
|
err error
|
|
}
|
|
|
|
// internal fetchWithRetries handles retry attempts with exponential backoff + jitter
|
|
func (c *Controller) fetchWithRetries(ctx context.Context, client *fetcher.Client, query, after string, limit int) (fetcher.PageResult, error) {
|
|
var lastErr error
|
|
var res fetcher.PageResult
|
|
for attempt := 0; attempt <= c.cfg.RetryLimit; attempt++ {
|
|
if attempt > 0 {
|
|
// backoff: base * 2^attempt + jitter
|
|
d := time.Duration(500*(1<<attempt)) * time.Millisecond
|
|
j := time.Duration(rand.Intn(300)) * time.Millisecond
|
|
time.Sleep(d + j)
|
|
}
|
|
var err error
|
|
res, err = client.FetchPage(ctx, query, after, limit)
|
|
if err == nil {
|
|
return res, nil
|
|
}
|
|
lastErr = err
|
|
// if context canceled, abort
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
|
break
|
|
}
|
|
}
|
|
return res, fmt.Errorf("fetch failed after retries: %w", lastErr)
|
|
}
|
|
|
|
func min(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|