diff --git a/TODO.md b/TODO.md index 4c1137c..6278b51 100644 --- a/TODO.md +++ b/TODO.md @@ -9,44 +9,44 @@ Milestone 0 – Bootstrap - [x] M0-04: Create Makefile (pending) Milestone 1 – Config and Logging -- [ ] M1-01: Implement logging setup using log/slog -- [ ] M1-02: Implement configuration loader: env + flags + .env -- [ ] M1-03: Define config schema and defaults -- [ ] M1-04: Add config validation +- [x] M1-01: Implement logging setup using log/slog +- [x] M1-02: Implement configuration loader: env + flags + .env +- [x] M1-03: Define config schema and defaults +- [x] M1-04: Add config validation - [ ] M1-05: Unit tests for config parsing precedence Milestone 2 – Types and Parser -- [ ] M2-01: Define normalized data types -- [ ] M2-02: Define minimal Reddit API response structs -- [ ] M2-03: Implement parser -- [ ] M2-04: Implement deleted/removed filters -- [ ] M2-05: Parser unit tests +- [x] M2-01: Define normalized data types +- [x] M2-02: Define minimal Reddit API response structs +- [x] M2-03: Implement parser +- [x] M2-04: Implement deleted/removed filters +- [x] M2-05: Parser unit tests Milestone 3 – Fetcher and Networking -- [ ] M3-01: Build HTTP client -- [ ] M3-02: Implement rate limiter -- [ ] M3-03: Implement backoff with jitter -- [ ] M3-04: URL builder for search.json -- [ ] M3-05: Implement fetchPage +- [x] M3-01: Build HTTP client +- [x] M3-02: Implement rate limiter +- [x] M3-03: Implement backoff with jitter +- [x] M3-04: URL builder for search.json +- [x] M3-05: Implement fetchPage - [ ] M3-06: Fetcher tests - [ ] M3-07: Implement metrics capture Milestone 4 – Storage and Dedup -- [ ] M4-01: Implement JSONL writer -- [ ] M4-02: File naming and rotation -- [ ] M4-03: Ensure output dir creation -- [ ] M4-04: Implement dedup index -- [ ] M4-05: Dedup persistence +- [x] M4-01: Implement JSONL writer +- [x] M4-02: File naming and rotation +- [x] M4-03: Ensure output dir creation +- [x] M4-04: Implement dedup index +- [x] M4-05: Dedup persistence - [ ] M4-06: Storage unit tests Milestone 5 – Controller and Orchestration -- [ ] M5-01: Implement controller orchestrator -- [ ] M5-02: Pagination loop -- [ ] M5-03: Integrate fetcher→parser→storage -- [ ] M5-04: Progress reporting -- [ ] M5-05: Graceful shutdown -- [ ] M5-06: Summary report -- [ ] M5-07: Wire CLI entrypoint +- [x] M5-01: Implement controller orchestrator +- [x] M5-02: Pagination loop +- [x] M5-03: Integrate fetcher→parser→storage +- [x] M5-04: Progress reporting +- [x] M5-05: Graceful shutdown +- [x] M5-06: Summary report +- [x] M5-07: Wire CLI entrypoint - [ ] M5-08: Error code taxonomy - [ ] M5-09: Controller integration test @@ -70,5 +70,9 @@ Milestone 9 – Verification Progress notes: -- Created project skeleton and minimal main.go. -- Next: implement logging + config and update main.go to use them. +- Implemented project bootstrap, config loader, and logging. +- Implemented types and parser with unit tests for parser. +- Implemented fetcher client and page fetcher (rate limiting & Retry-After handling). +- Implemented storage (JSONL writer with rotation) and dedup index (persistent append-only file). +- Implemented controller orchestrator that coordinates fetching, parsing, deduping, and writing; supports graceful shutdown and basic retry/backoff. +- Remaining: add unit tests for fetcher & storage, wire CLI to controller, and add integration tests and metrics. diff --git a/cmd/reddit-scraper/main.go b/cmd/reddit-scraper/main.go index c4a70f2..30a7453 100644 --- a/cmd/reddit-scraper/main.go +++ b/cmd/reddit-scraper/main.go @@ -6,9 +6,9 @@ import ( "os" "os/signal" "syscall" - "time" "github.com/yourname/reddit-scraper/internal/config" + "github.com/yourname/reddit-scraper/internal/controller" "github.com/yourname/reddit-scraper/internal/logging" ) @@ -25,11 +25,11 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - // placeholder: in future wire up controller - select { - case <-time.After(1 * time.Second): - logger.Info("done (placeholder)") - case <-ctx.Done(): - logger.Info("cancelled") + ctrl := controller.New(cfg, logger) + summary, err := ctrl.Run(ctx) + if err != nil { + logger.Error("run failed", "err", err) + os.Exit(1) } + logger.Info("run complete", "summary", summary) } diff --git a/go.mod b/go.mod index 538cd33..18011e6 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module github.com/yourname/reddit-scraper go 1.25.2 + +require golang.org/x/time v0.14.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ed7d3d5 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= diff --git a/internal/controller/controller.go b/internal/controller/controller.go new file mode 100644 index 0000000..c935859 --- /dev/null +++ b/internal/controller/controller.go @@ -0,0 +1,205 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "log/slog" + "math/rand" + "sync" + "time" + + "github.com/yourname/reddit-scraper/internal/config" + "github.com/yourname/reddit-scraper/internal/fetcher" + "github.com/yourname/reddit-scraper/internal/parser" + "github.com/yourname/reddit-scraper/internal/storage" + "github.com/yourname/reddit-scraper/internal/types" +) + +// Controller orchestrates fetching, parsing, deduplication, and storage. +type Controller struct { + cfg *config.Config + logger *slog.Logger +} + +// New creates a controller instance. +func New(cfg *config.Config, logger *slog.Logger) *Controller { + return &Controller{cfg: cfg, logger: logger} +} + +// Run performs the scraping job until it collects cfg.Limit posts or context is done. +func (c *Controller) Run(ctx context.Context) (types.Summary, error) { + lg := c.logger + lg.Info("controller starting", + "keyword", c.cfg.Keyword, + "limit", c.cfg.Limit, + "concurrency", c.cfg.Concurrency, + ) + + // Setup components + client := fetcher.NewClient(c.cfg.UserAgent, 30*time.Second, c.cfg.RateLimitDelay, c.cfg.Concurrency, c.cfg.RetryLimit) + dedup, err := storage.LoadDedup(c.cfg.DedupCachePath) + if err != nil { + return types.Summary{}, fmt.Errorf("loading dedup: %w", err) + } + defer dedup.Close() + + writer, err := storage.NewJSONLWriter(c.cfg.Keyword, c.cfg.OutputDir, 10*1024*1024) + if err != nil { + return types.Summary{}, fmt.Errorf("creating writer: %w", err) + } + defer writer.Close() + + var mu sync.Mutex + summary := types.Summary{} + startTime := time.Now() + + // Channels + tasks := make(chan fetchTask) + results := make(chan fetchResult) + + // Worker pool + var wg sync.WaitGroup + for i := 0; i < c.cfg.Concurrency; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for t := range tasks { + res, err := c.fetchWithRetries(ctx, client, t.query, t.after, t.limit) + if err != nil { + lg.Warn("worker fetch error", "worker", workerID, "err", err) + results <- fetchResult{err: err} + continue + } + results <- fetchResult{res: res} + } + }(i) + } + + // Controller goroutine to manage pages and collection + var collectWg sync.WaitGroup + collectWg.Add(1) + go func() { + defer collectWg.Done() + defer close(results) + + collected := 0 + after := "" + for collected < c.cfg.Limit { + select { + case <-ctx.Done(): + lg.Info("context canceled, stopping page dispatch") + return + default: + } + + // dispatch a page task + tasks <- fetchTask{query: c.cfg.Keyword, after: after, limit: min(100, c.cfg.Limit-collected)} + + // wait for a result and process it + fr := <-results + if fr.err != nil { + // on error, back off a bit and retry dispatch + lg.Warn("fetch result error", "err", fr.err) + time.Sleep(2 * time.Second) + continue + } + + // process posts + for _, jp := range fr.res.Posts { + p, perr := parser.ParseJSONPost(jp) + if perr != nil { + lg.Debug("skipping post", "reason", perr) + continue + } + if dedup.Seen(p.ID) { + mu.Lock() + summary.SkippedDuplicates++ + mu.Unlock() + continue + } + if err := writer.Write(p); err != nil { + lg.Warn("write error", "err", err) + continue + } + if err := dedup.Add(p.ID); err != nil { + lg.Warn("dedup add error", "err", err) + } + mu.Lock() + summary.SuccessfulPosts++ + mu.Unlock() + collected++ + if collected >= c.cfg.Limit { + break + } + } + + summary.TotalRequests++ + if fr.res.After == "" { + // no more pages + lg.Info("no after token, finishing") + break + } + after = fr.res.After + } + }() + + // Wait until pages are dispatched/processed + // Close tasks to stop workers when we're done dispatching + collectWg.Wait() + close(tasks) + wg.Wait() + + // finalize + duration := time.Since(startTime) + summary.DurationSec = duration.Seconds() + if summary.SuccessfulPosts > 0 { + summary.AvgLatencyMs = float64(summary.DurationSec*1000) / float64(summary.SuccessfulPosts) + } + lg.Info("controller finished", "summary", summary) + return summary, nil +} + +// helper types +type fetchTask struct { + query string + after string + limit int +} + +type fetchResult struct { + res fetcher.PageResult + err error +} + +// internal fetchWithRetries handles retry attempts with exponential backoff + jitter +func (c *Controller) fetchWithRetries(ctx context.Context, client *fetcher.Client, query, after string, limit int) (fetcher.PageResult, error) { + var lastErr error + var res fetcher.PageResult + for attempt := 0; attempt <= c.cfg.RetryLimit; attempt++ { + if attempt > 0 { + // backoff: base * 2^attempt + jitter + d := time.Duration(500*(1< 0 { + time.Sleep(dur) + } + } + } + return res, fmt.Errorf("rate limited: 429") + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return res, fmt.Errorf("http %d: %s", resp.StatusCode, string(body)) + } + + var api APIResponse + dec := json.NewDecoder(resp.Body) + if err := dec.Decode(&api); err != nil { + return res, err + } + for _, child := range api.Data.Children { + res.Posts = append(res.Posts, child.Data) + } + res.After = api.Data.After + return res, nil +}