package controller import ( "context" "errors" "fmt" "log/slog" "math/rand" "sync" "time" "github.com/yourname/reddit-scraper/internal/config" "github.com/yourname/reddit-scraper/internal/fetcher" "github.com/yourname/reddit-scraper/internal/parser" "github.com/yourname/reddit-scraper/internal/storage" "github.com/yourname/reddit-scraper/internal/types" ) // Controller orchestrates fetching, parsing, deduplication, and storage. type Controller struct { cfg *config.Config logger *slog.Logger } // New creates a controller instance. func New(cfg *config.Config, logger *slog.Logger) *Controller { return &Controller{cfg: cfg, logger: logger} } // Run performs the scraping job until it collects cfg.Limit posts or context is done. func (c *Controller) Run(ctx context.Context) (types.Summary, error) { lg := c.logger lg.Info("controller starting", "keyword", c.cfg.Keyword, "limit", c.cfg.Limit, "concurrency", c.cfg.Concurrency, ) // Setup components client := fetcher.NewClient(c.cfg.UserAgent, 30*time.Second, c.cfg.RateLimitDelay, c.cfg.Concurrency, c.cfg.RetryLimit) dedup, err := storage.LoadDedup(c.cfg.DedupCachePath) if err != nil { return types.Summary{}, fmt.Errorf("loading dedup: %w", err) } defer dedup.Close() writer, err := storage.NewJSONLWriter(c.cfg.Keyword, c.cfg.OutputDir, 10*1024*1024) if err != nil { return types.Summary{}, fmt.Errorf("creating writer: %w", err) } defer writer.Close() var mu sync.Mutex summary := types.Summary{} startTime := time.Now() // Channels tasks := make(chan fetchTask) results := make(chan fetchResult) // Worker pool var wg sync.WaitGroup for i := 0; i < c.cfg.Concurrency; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for t := range tasks { res, err := c.fetchWithRetries(ctx, client, t.query, t.after, t.limit) if err != nil { lg.Warn("worker fetch error", "worker", workerID, "err", err) results <- fetchResult{err: err} continue } results <- fetchResult{res: res} } }(i) } // Controller goroutine to manage pages and collection var collectWg sync.WaitGroup collectWg.Add(1) go func() { defer collectWg.Done() defer close(results) collected := 0 after := "" for collected < c.cfg.Limit { select { case <-ctx.Done(): lg.Info("context canceled, stopping page dispatch") return default: } // dispatch a page task tasks <- fetchTask{query: c.cfg.Keyword, after: after, limit: min(100, c.cfg.Limit-collected)} // wait for a result and process it fr := <-results if fr.err != nil { // on error, back off a bit and retry dispatch lg.Warn("fetch result error", "err", fr.err) time.Sleep(2 * time.Second) continue } // process posts for _, jp := range fr.res.Posts { p, perr := parser.ParseJSONPost(jp) if perr != nil { lg.Debug("skipping post", "reason", perr) continue } if dedup.Seen(p.ID) { mu.Lock() summary.SkippedDuplicates++ mu.Unlock() continue } if err := writer.Write(p); err != nil { lg.Warn("write error", "err", err) continue } if err := dedup.Add(p.ID); err != nil { lg.Warn("dedup add error", "err", err) } mu.Lock() summary.SuccessfulPosts++ mu.Unlock() collected++ if collected >= c.cfg.Limit { break } } summary.TotalRequests++ if fr.res.After == "" { // no more pages lg.Info("no after token, finishing") break } after = fr.res.After } }() // Wait until pages are dispatched/processed // Close tasks to stop workers when we're done dispatching collectWg.Wait() close(tasks) wg.Wait() // finalize duration := time.Since(startTime) summary.DurationSec = duration.Seconds() if summary.SuccessfulPosts > 0 { summary.AvgLatencyMs = float64(summary.DurationSec*1000) / float64(summary.SuccessfulPosts) } lg.Info("controller finished", "summary", summary) return summary, nil } // helper types type fetchTask struct { query string after string limit int } type fetchResult struct { res fetcher.PageResult err error } // internal fetchWithRetries handles retry attempts with exponential backoff + jitter func (c *Controller) fetchWithRetries(ctx context.Context, client *fetcher.Client, query, after string, limit int) (fetcher.PageResult, error) { var lastErr error var res fetcher.PageResult for attempt := 0; attempt <= c.cfg.RetryLimit; attempt++ { if attempt > 0 { // backoff: base * 2^attempt + jitter d := time.Duration(500*(1<