ForFarm/backend/internal/event/projection.go

160 lines
5.4 KiB
Go

package event
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"
"github.com/forfarm/backend/internal/domain"
)
type FarmAnalyticsProjection struct {
eventSubscriber domain.EventSubscriber
repository domain.AnalyticsRepository
logger *slog.Logger
}
func NewFarmAnalyticsProjection(
subscriber domain.EventSubscriber,
repository domain.AnalyticsRepository,
logger *slog.Logger,
) *FarmAnalyticsProjection {
if logger == nil {
logger = slog.Default()
}
return &FarmAnalyticsProjection{
eventSubscriber: subscriber,
repository: repository,
logger: logger,
}
}
func (p *FarmAnalyticsProjection) Start(ctx context.Context) error {
eventTypes := []string{
"farm.created", "farm.updated", "farm.deleted",
"weather.updated",
"cropland.created", "cropland.updated", "cropland.deleted",
"inventory.item.created", "inventory.item.updated", "inventory.item.deleted",
}
p.logger.Info("FarmAnalyticsProjection starting, subscribing to events", "types", eventTypes)
var errs []error
for _, eventType := range eventTypes {
if err := p.eventSubscriber.Subscribe(ctx, eventType, p.handleEvent); err != nil {
p.logger.Error("Failed to subscribe to event type", "type", eventType, "error", err)
errs = append(errs, fmt.Errorf("failed to subscribe to %s: %w", eventType, err))
} else {
p.logger.Info("Successfully subscribed to event type", "type", eventType)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
p.logger.Info("FarmAnalyticsProjection started successfully")
return nil
}
func (p *FarmAnalyticsProjection) handleEvent(event domain.Event) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
p.logger.Debug("Handling event in FarmAnalyticsProjection", "type", event.Type, "aggregate_id", event.AggregateID, "event_id", event.ID)
farmID := event.AggregateID
// Try to get farmID from payload if AggregateID is empty or potentially not the farmID (e.g., user events)
if farmID == "" || event.Type == "inventory.item.created" || event.Type == "inventory.item.updated" || event.Type == "inventory.item.deleted" || event.Type == "cropland.created" || event.Type == "cropland.updated" || event.Type == "cropland.deleted" {
payloadMap, ok := event.Payload.(map[string]interface{})
if ok {
if idVal, ok := payloadMap["farm_id"].(string); ok && idVal != "" {
farmID = idVal
} else if event.Type != "farm.deleted" && event.Type != "farm.created" {
p.logger.Warn("Could not determine farm_id from event payload or AggregateID", "event_type", event.Type, "event_id", event.ID, "aggregate_id", event.AggregateID)
return nil
}
} else if event.Type != "farm.deleted" && event.Type != "farm.created" {
p.logger.Error("Event payload is not a map, cannot extract farm_id", "event_type", event.Type, "event_id", event.ID)
return nil
}
}
if farmID == "" && event.Type != "farm.deleted" {
p.logger.Error("Cannot process event, missing farm_id", "event_type", event.Type, "event_id", event.ID, "aggregate_id", event.AggregateID)
return nil
}
var err error
switch event.Type {
case "farm.created", "farm.updated":
var farmData domain.Farm
jsonData, _ := json.Marshal(event.Payload)
if err = json.Unmarshal(jsonData, &farmData); err != nil {
p.logger.Error("Failed to unmarshal farm data from event payload", "event_id", event.ID, "error", err)
return nil
}
if farmData.UUID == "" {
farmData.UUID = event.AggregateID
}
p.logger.Info("Processing farm event", "event_type", event.Type, "farm_id", farmData.UUID, "owner_id", farmData.OwnerID)
err = p.repository.CreateOrUpdateFarmBaseData(ctx, &farmData)
case "farm.deleted":
farmID = event.AggregateID
if farmID == "" {
p.logger.Error("Cannot process farm.deleted event, missing farm_id in AggregateID", "event_id", event.ID)
return nil
}
err = p.repository.DeleteFarmAnalytics(ctx, farmID)
case "weather.updated":
var weatherData domain.WeatherData
jsonData, _ := json.Marshal(event.Payload)
if err = json.Unmarshal(jsonData, &weatherData); err != nil {
p.logger.Error("Failed to unmarshal weather data from event payload", "event_id", event.ID, "error", err)
return nil
}
err = p.repository.UpdateFarmAnalyticsWeather(ctx, farmID, &weatherData)
case "cropland.created", "cropland.updated", "cropland.deleted":
payloadMap, ok := event.Payload.(map[string]interface{})
if !ok {
p.logger.Error("Failed to cast cropland event payload to map", "event_id", event.ID)
return nil
}
idVal, ok := payloadMap["farm_id"].(string)
if !ok || idVal == "" {
p.logger.Error("Missing farm_id in cropland event payload", "event_id", event.ID, "event_type", event.Type)
return nil
}
farmID = idVal
err = p.repository.UpdateFarmAnalyticsCropStats(ctx, farmID)
case "inventory.item.created", "inventory.item.updated", "inventory.item.deleted":
if farmID != "" {
err = p.repository.UpdateFarmAnalyticsInventoryStats(ctx, farmID)
} else {
p.logger.Warn("Skipping inventory stats update due to missing farm_id", "event_id", event.ID)
return nil
}
default:
p.logger.Warn("Received unhandled event type", "type", event.Type, "event_id", event.ID)
return nil
}
if err != nil {
p.logger.Error("Failed to update farm analytics", "event_type", event.Type, "farm_id", farmID, "error", err)
return nil
}
p.logger.Debug("Successfully processed event and updated farm analytics", "event_type", event.Type, "farm_id", farmID)
return nil
}