From 21559f3b55fe19d72dcd94bf917b0e6733de69a9 Mon Sep 17 00:00:00 2001 From: Sosokker Date: Thu, 27 Mar 2025 23:45:39 +0700 Subject: [PATCH] feat: add analytic event aggregator --- backend/.air.toml | 4 +- backend/internal/api/api.go | 16 +- backend/internal/cmd/api.go | 29 +++- backend/internal/config/config.go | 3 + backend/internal/domain/analytics.go | 89 +++++++++++ backend/internal/domain/event.go | 28 ++++ backend/internal/domain/farm.go | 1 + backend/internal/event/aggregator.go | 64 ++++++++ backend/internal/event/eventbus.go | 150 ++++++++++++++++++ backend/internal/event/projection.go | 48 ++++++ .../internal/repository/postgres_analytic.go | 142 +++++++++++++++++ backend/internal/repository/postgres_farm.go | 52 +++++- .../00005_create_analytic_table.sql | 77 +++++++++ .../00006_add_farm_analytic_view.sql | 106 +++++++++++++ 14 files changed, 797 insertions(+), 12 deletions(-) create mode 100644 backend/internal/domain/analytics.go create mode 100644 backend/internal/domain/event.go create mode 100644 backend/internal/event/aggregator.go create mode 100644 backend/internal/event/eventbus.go create mode 100644 backend/internal/event/projection.go create mode 100644 backend/internal/repository/postgres_analytic.go create mode 100644 backend/migrations/00005_create_analytic_table.sql create mode 100644 backend/migrations/00006_add_farm_analytic_view.sql diff --git a/backend/.air.toml b/backend/.air.toml index 026f712..bd22597 100644 --- a/backend/.air.toml +++ b/backend/.air.toml @@ -2,8 +2,8 @@ root = "." tmp_dir = "tmp" [build] -cmd = "go build -o ./tmp/api ./cmd/forfarm" -bin = "./tmp/api" +cmd = "go build -o ./tmp/api.exe ./cmd/forfarm" +bin = "./tmp/api.exe" args_bin = ["api"] include_ext = ["go", "tpl", "tmpl", "html"] exclude_dir = ["assets", "tmp", "vendor"] diff --git a/backend/internal/api/api.go b/backend/internal/api/api.go index aa5754a..98eca6c 100644 --- a/backend/internal/api/api.go +++ b/backend/internal/api/api.go @@ -22,8 +22,9 @@ import ( ) type api struct { - logger *slog.Logger - httpClient *http.Client + logger *slog.Logger + httpClient *http.Client + eventPublisher domain.EventPublisher userRepo domain.UserRepository cropRepo domain.CroplandRepository @@ -31,7 +32,7 @@ type api struct { plantRepo domain.PlantRepository } -func NewAPI(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool) *api { +func NewAPI(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool, eventPublisher domain.EventPublisher) *api { client := &http.Client{} @@ -40,9 +41,12 @@ func NewAPI(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool) *api { farmRepository := repository.NewPostgresFarm(pool) plantRepository := repository.NewPostgresPlant(pool) + farmRepository.SetEventPublisher(eventPublisher) + return &api{ - logger: logger, - httpClient: client, + logger: logger, + httpClient: client, + eventPublisher: eventPublisher, userRepo: userRepository, cropRepo: croplandRepository, @@ -72,7 +76,7 @@ func (a *api) Routes() *chi.Mux { router.Use(cors.Handler(cors.Options{ // AllowedOrigins: []string{"https://foo.com"}, // Use this to allow specific origin hosts - AllowedOrigins: []string{"https://*", "http://*"}, + AllowedOrigins: []string{"https://*", "http://*", "http://localhost:3000"}, // AllowOriginFunc: func(r *http.Request, origin string) bool { return true }, AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "X-CSRF-Token"}, diff --git a/backend/internal/cmd/api.go b/backend/internal/cmd/api.go index d257756..c20414d 100644 --- a/backend/internal/cmd/api.go +++ b/backend/internal/cmd/api.go @@ -12,6 +12,8 @@ import ( "github.com/forfarm/backend/internal/api" "github.com/forfarm/backend/internal/cmdutil" "github.com/forfarm/backend/internal/config" + "github.com/forfarm/backend/internal/event" + "github.com/forfarm/backend/internal/repository" ) func APICmd(ctx context.Context) *cobra.Command { @@ -32,7 +34,32 @@ func APICmd(ctx context.Context) *cobra.Command { logger.Info("connected to database") - api := api.NewAPI(ctx, logger, pool) + // Events + + eventBus, err := event.NewRabbitMQEventBus(config.RABBITMQ_URL, logger) + if err != nil { + logger.Error("failed to connect to event bus", "err", err) + os.Exit(1) + } + logger.Info("connecting to event bus", "url", config.RABBITMQ_URL) + + aggregator := event.NewEventAggregator(eventBus, eventBus, logger) + if err := aggregator.Start(ctx); err != nil { + logger.Error("failed to start event aggregator", "err", err) + os.Exit(1) + } + logger.Info("Start event aggregator") + + analyticRepo := repository.NewPostgresAnalyticsRepository(pool) + projection := event.NewAnalyticsProjection(eventBus, analyticRepo, logger) + if err := projection.Start(ctx); err != nil { + logger.Error("failed to start analytics projection", "err", err) + os.Exit(1) + } + logger.Info("Start analytics projection") + // + + api := api.NewAPI(ctx, logger, pool, eventBus) server := api.Server(port) go func() { diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 34ce9f8..856c82b 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -16,6 +16,7 @@ var ( GOOGLE_CLIENT_SECRET string GOOGLE_REDIRECT_URL string JWT_SECRET_KEY string + RABBITMQ_URL string ) func Load() { @@ -28,6 +29,7 @@ func Load() { viper.SetDefault("GOOGLE_CLIENT_SECRET", "google_client_secret") viper.SetDefault("JWT_SECRET_KEY", "jwt_secret_key") viper.SetDefault("GOOGLE_REDIRECT_URL", "http://localhost:8000/auth/login/google") + viper.SetDefault("RABBITMQ_URL", "amqp://user:password@localhost:5672/") viper.SetConfigFile(".env") viper.AddConfigPath("../../.") @@ -47,4 +49,5 @@ func Load() { GOOGLE_CLIENT_SECRET = viper.GetString("GOOGLE_CLIENT_SECRET") GOOGLE_REDIRECT_URL = viper.GetString("GOOGLE_REDIRECT_URL") JWT_SECRET_KEY = viper.GetString("JWT_SECRET_KEY") + RABBITMQ_URL = viper.GetString("RABBITMQ_URL") } diff --git a/backend/internal/domain/analytics.go b/backend/internal/domain/analytics.go new file mode 100644 index 0000000..505291a --- /dev/null +++ b/backend/internal/domain/analytics.go @@ -0,0 +1,89 @@ +package domain + +import ( + "context" + "time" +) + +type FarmAnalytics struct { + FarmID string + Name string + OwnerID string + LastUpdated time.Time + WeatherData *WeatherAnalytics `json:"weather_data,omitempty"` + InventoryData *InventoryAnalytics `json:"inventory_data,omitempty"` + PlantHealthData *PlantHealthAnalytics `json:"plant_health_data,omitempty"` + FinancialData *FinancialAnalytics `json:"financial_data,omitempty"` + ProductionData *ProductionAnalytics `json:"production_data,omitempty"` +} + +type WeatherAnalytics struct { + LastUpdated time.Time + Temperature float64 + Humidity float64 + Rainfall float64 + WindSpeed float64 + WeatherStatus string + AlertLevel string + ForecastSummary string +} + +type InventoryAnalytics struct { + LastUpdated time.Time + TotalItems int + LowStockItems int + TotalValue float64 + RecentChanges []InventoryChange +} + +type InventoryChange struct { + ItemID string + ItemName string + ChangeAmount float64 + ChangeType string + ChangedAt time.Time +} + +type PlantHealthAnalytics struct { + LastUpdated time.Time + HealthyPlants int + UnhealthyPlants int + CriticalPlants int + RecentHealthIssues []PlantHealthIssue +} + +type PlantHealthIssue struct { + PlantID string + PlantName string + HealthStatus string + AlertLevel string + RecordedAt time.Time +} + +type FinancialAnalytics struct { + LastUpdated time.Time + TotalRevenue float64 + TotalExpenses float64 + NetProfit float64 + RecentTransactions []TransactionSummary +} + +type TransactionSummary struct { + TransactionID string + Type string + Amount float64 + Status string + CreatedAt time.Time +} + +type ProductionAnalytics struct { + LastUpdated time.Time + TotalProduction float64 + YieldRate float64 + HarvestForecast float64 +} + +type AnalyticsRepository interface { + GetFarmAnalytics(ctx context.Context, farmID string) (*FarmAnalytics, error) + SaveFarmAnalytics(ctx context.Context, farmID string, data interface{}) error +} diff --git a/backend/internal/domain/event.go b/backend/internal/domain/event.go new file mode 100644 index 0000000..d7bb33c --- /dev/null +++ b/backend/internal/domain/event.go @@ -0,0 +1,28 @@ +package domain + +import ( + "context" + "time" +) + +type Event struct { + ID string + Type string + Source string + Timestamp time.Time + Payload interface{} + AggregateID string +} + +type EventPublisher interface { + Publish(ctx context.Context, event Event) error +} + +type EventSubscriber interface { + Subscribe(ctx context.Context, eventType string, handler func(Event) error) error +} + +type EventBus interface { + EventPublisher + EventSubscriber +} diff --git a/backend/internal/domain/farm.go b/backend/internal/domain/farm.go index 6de0cac..6201c8a 100644 --- a/backend/internal/domain/farm.go +++ b/backend/internal/domain/farm.go @@ -33,4 +33,5 @@ type FarmRepository interface { GetByOwnerID(context.Context, string) ([]Farm, error) CreateOrUpdate(context.Context, *Farm) error Delete(context.Context, string) error + SetEventPublisher(EventPublisher) } diff --git a/backend/internal/event/aggregator.go b/backend/internal/event/aggregator.go new file mode 100644 index 0000000..05d587e --- /dev/null +++ b/backend/internal/event/aggregator.go @@ -0,0 +1,64 @@ +package event + +import ( + "context" + "log/slog" + "time" + + "github.com/forfarm/backend/internal/domain" +) + +type EventAggregator struct { + sourceSubscriber domain.EventSubscriber + targetPublisher domain.EventPublisher + logger *slog.Logger +} + +func NewEventAggregator( + sourceSubscriber domain.EventSubscriber, + targetPublisher domain.EventPublisher, + logger *slog.Logger, +) *EventAggregator { + return &EventAggregator{ + sourceSubscriber: sourceSubscriber, + targetPublisher: targetPublisher, + logger: logger, + } +} + +func (a *EventAggregator) Start(ctx context.Context) error { + // Subscribe to fine-grained events + eventTypes := []string{ + "farm.created", "farm.updated", "farm.deleted", + "weather.updated", "inventory.changed", "marketplace.transaction", + } + + for _, eventType := range eventTypes { + if err := a.sourceSubscriber.Subscribe(ctx, eventType, a.handleEvent); err != nil { + return err + } + } + + return nil +} + +func (a *EventAggregator) handleEvent(event domain.Event) error { + // Logic to aggregate events + // For example, combine farm and weather events into a farm status event + + if event.Type == "farm.created" || event.Type == "farm.updated" { + // Create a coarse-grained event + aggregatedEvent := domain.Event{ + ID: event.ID, + Type: "farm.status_changed", + Source: "event-aggregator", + Timestamp: time.Now(), + Payload: event.Payload, + AggregateID: event.AggregateID, + } + + return a.targetPublisher.Publish(context.Background(), aggregatedEvent) + } + + return nil +} diff --git a/backend/internal/event/eventbus.go b/backend/internal/event/eventbus.go new file mode 100644 index 0000000..bcb788e --- /dev/null +++ b/backend/internal/event/eventbus.go @@ -0,0 +1,150 @@ +package event + +import ( + "context" + "encoding/json" + "log/slog" + + "github.com/forfarm/backend/internal/domain" + amqp "github.com/rabbitmq/amqp091-go" +) + +type RabbitMQEventBus struct { + conn *amqp.Connection + channel *amqp.Channel + logger *slog.Logger +} + +func NewRabbitMQEventBus(url string, logger *slog.Logger) (*RabbitMQEventBus, error) { + conn, err := amqp.Dial(url) + if err != nil { + return nil, err + } + + ch, err := conn.Channel() + if err != nil { + conn.Close() + return nil, err + } + + // Declare the exchange + err = ch.ExchangeDeclare( + "events", // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + ch.Close() + conn.Close() + return nil, err + } + + return &RabbitMQEventBus{ + conn: conn, + channel: ch, + logger: logger, + }, nil +} + +func (r *RabbitMQEventBus) Publish(ctx context.Context, event domain.Event) error { + data, err := json.Marshal(event) + if err != nil { + return err + } + + return r.channel.PublishWithContext( + ctx, + "events", // exchange + "events."+event.Type, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: data, + DeliveryMode: amqp.Persistent, + MessageId: event.ID, + Timestamp: event.Timestamp, + }, + ) +} + +func (r *RabbitMQEventBus) Subscribe(ctx context.Context, eventType string, handler func(domain.Event) error) error { + // Declare a queue for this consumer + q, err := r.channel.QueueDeclare( + "", // name (empty = auto-generated) + false, // durable + true, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return err + } + + // Bind the queue to the exchange + err = r.channel.QueueBind( + q.Name, // queue name + "events."+eventType, // routing key + "events", // exchange + false, // no-wait + nil, // arguments + ) + if err != nil { + return err + } + + // Start consuming + msgs, err := r.channel.Consume( + q.Name, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return err + } + + go func() { + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-msgs: + if !ok { + return + } + + var event domain.Event + if err := json.Unmarshal(msg.Body, &event); err != nil { + r.logger.Error("Failed to unmarshal event", "error", err) + msg.Nack(false, false) + continue + } + + if err := handler(event); err != nil { + r.logger.Error("Failed to handle event", "error", err) + msg.Nack(false, true) // requeue + } else { + msg.Ack(false) + } + } + } + }() + + return nil +} + +func (r *RabbitMQEventBus) Close() error { + if err := r.channel.Close(); err != nil { + return err + } + return r.conn.Close() +} diff --git a/backend/internal/event/projection.go b/backend/internal/event/projection.go new file mode 100644 index 0000000..971e4c5 --- /dev/null +++ b/backend/internal/event/projection.go @@ -0,0 +1,48 @@ +package event + +import ( + "context" + "log/slog" + + "github.com/forfarm/backend/internal/domain" +) + +type AnalyticsProjection struct { + eventSubscriber domain.EventSubscriber + repository domain.AnalyticsRepository + logger *slog.Logger +} + +func NewAnalyticsProjection( + subscriber domain.EventSubscriber, + repository domain.AnalyticsRepository, + logger *slog.Logger, +) *AnalyticsProjection { + return &AnalyticsProjection{ + eventSubscriber: subscriber, + repository: repository, + logger: logger, + } +} + +func (p *AnalyticsProjection) Start(ctx context.Context) error { + // Subscribe to coarse-grained events + eventTypes := []string{"farm.status_changed"} + + for _, eventType := range eventTypes { + if err := p.eventSubscriber.Subscribe(ctx, eventType, p.handleEvent); err != nil { + return err + } + } + + return nil +} + +func (p *AnalyticsProjection) handleEvent(event domain.Event) error { + // Update materialized view based on event + if event.Type == "farm.status_changed" { + return p.repository.SaveFarmAnalytics(context.Background(), event.AggregateID, event.Payload) + } + + return nil +} diff --git a/backend/internal/repository/postgres_analytic.go b/backend/internal/repository/postgres_analytic.go new file mode 100644 index 0000000..e7e289c --- /dev/null +++ b/backend/internal/repository/postgres_analytic.go @@ -0,0 +1,142 @@ +package repository + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/forfarm/backend/internal/domain" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +type PostgresAnalyticsRepository struct { + pool *pgxpool.Pool +} + +func NewPostgresAnalyticsRepository(pool *pgxpool.Pool) domain.AnalyticsRepository { + return &PostgresAnalyticsRepository{pool: pool} +} + +func (p *PostgresAnalyticsRepository) GetFarmAnalytics(ctx context.Context, farmID string) (*domain.FarmAnalytics, error) { + query := ` + SELECT + farm_id, + farm_name, + owner_id, + last_updated, + weather_data, + inventory_data, + plant_health_data, + financial_data, + production_data + FROM + farm_analytics_view + WHERE + farm_id = $1` + + var analytics domain.FarmAnalytics + var weatherJSON, inventoryJSON, plantHealthJSON, financialJSON, productionJSON []byte + + err := p.pool.QueryRow(ctx, query, farmID).Scan( + &analytics.FarmID, + &analytics.Name, + &analytics.OwnerID, + &analytics.LastUpdated, + &weatherJSON, + &inventoryJSON, + &plantHealthJSON, + &financialJSON, + &productionJSON, + ) + + if err != nil { + if err == pgx.ErrNoRows { + return nil, fmt.Errorf("no analytics found for farm %s", farmID) + } + return nil, err + } + + // Unmarshal JSON data into structs + if len(weatherJSON) > 0 { + var weather domain.WeatherAnalytics + if err := json.Unmarshal(weatherJSON, &weather); err == nil { + analytics.WeatherData = &weather + } + } + + if len(inventoryJSON) > 0 { + var inventory domain.InventoryAnalytics + if err := json.Unmarshal(inventoryJSON, &inventory); err == nil { + analytics.InventoryData = &inventory + } + } + + if len(plantHealthJSON) > 0 { + var plantHealth domain.PlantHealthAnalytics + if err := json.Unmarshal(plantHealthJSON, &plantHealth); err == nil { + analytics.PlantHealthData = &plantHealth + } + } + + if len(financialJSON) > 0 { + var financial domain.FinancialAnalytics + if err := json.Unmarshal(financialJSON, &financial); err == nil { + analytics.FinancialData = &financial + } + } + + if len(productionJSON) > 0 { + var production domain.ProductionAnalytics + if err := json.Unmarshal(productionJSON, &production); err == nil { + analytics.ProductionData = &production + } + } + + return &analytics, nil +} + +func (p *PostgresAnalyticsRepository) SaveFarmAnalytics(ctx context.Context, farmID string, data interface{}) error { + var jsonData []byte + var err error + + // Handle different possible types of the data parameter + switch v := data.(type) { + case []byte: + jsonData = v + case string: + jsonData = []byte(v) + case map[string]interface{}: + jsonData, err = json.Marshal(v) + default: + jsonData, err = json.Marshal(v) + } + + if err != nil { + return fmt.Errorf("failed to prepare JSON data: %w", err) + } + + // Validate that we have valid JSON + var testObj interface{} + if err := json.Unmarshal(jsonData, &testObj); err != nil { + return fmt.Errorf("invalid JSON data: %w", err) + } + + query := ` + INSERT INTO analytics_events ( + farm_id, + event_type, + event_data, + created_at + ) VALUES ($1, $2, $3::jsonb, $4)` + + eventType := "farm.status_changed" + + _, err = p.pool.Exec(ctx, query, farmID, eventType, string(jsonData), time.Now()) + if err != nil { + return fmt.Errorf("failed to insert analytics event: %w", err) + } + + return nil +} diff --git a/backend/internal/repository/postgres_farm.go b/backend/internal/repository/postgres_farm.go index 8910b08..a9af093 100644 --- a/backend/internal/repository/postgres_farm.go +++ b/backend/internal/repository/postgres_farm.go @@ -3,19 +3,25 @@ package repository import ( "context" "strings" + "time" "github.com/forfarm/backend/internal/domain" "github.com/google/uuid" ) type postgresFarmRepository struct { - conn Connection + conn Connection + eventPublisher domain.EventPublisher } func NewPostgresFarm(conn Connection) domain.FarmRepository { return &postgresFarmRepository{conn: conn} } +func (p *postgresFarmRepository) SetEventPublisher(publisher domain.EventPublisher) { + p.eventPublisher = publisher +} + func (p *postgresFarmRepository) fetch(ctx context.Context, query string, args ...interface{}) ([]domain.Farm, error) { rows, err := p.conn.Query(ctx, query, args...) if err != nil { @@ -77,7 +83,9 @@ func (p *postgresFarmRepository) GetByOwnerID(ctx context.Context, ownerID strin } func (p *postgresFarmRepository) CreateOrUpdate(ctx context.Context, f *domain.Farm) error { - if strings.TrimSpace(f.UUID) == "" { + isNew := strings.TrimSpace(f.UUID) == "" + + if isNew { f.UUID = uuid.New().String() } @@ -93,8 +101,46 @@ func (p *postgresFarmRepository) CreateOrUpdate(ctx context.Context, f *domain.F updated_at = NOW(), owner_id = EXCLUDED.owner_id RETURNING uuid, created_at, updated_at` - return p.conn.QueryRow(ctx, query, f.UUID, f.Name, f.Lat, f.Lon, f.FarmType, f.TotalSize, f.OwnerID). + err := p.conn.QueryRow(ctx, query, f.UUID, f.Name, f.Lat, f.Lon, f.FarmType, f.TotalSize, f.OwnerID). Scan(&f.UUID, &f.CreatedAt, &f.UpdatedAt) + + if err != nil { + return err + } + + if p.eventPublisher != nil { + eventType := "farm.updated" + if isNew { + eventType = "farm.created" + } + + event := domain.Event{ + ID: uuid.New().String(), + Type: eventType, + Source: "farm-repository", + Timestamp: time.Now(), + AggregateID: f.UUID, + Payload: map[string]interface{}{ + "farm_id": f.UUID, + "name": f.Name, + "location": map[string]float64{"lat": f.Lat, "lon": f.Lon}, + "farm_type": f.FarmType, + "total_size": f.TotalSize, + "owner_id": f.OwnerID, + "created_at": f.CreatedAt, + "updated_at": f.UpdatedAt, + }, + } + + go func() { + bgCtx := context.Background() + if err := p.eventPublisher.Publish(bgCtx, event); err != nil { + println("Failed to publish event", err.Error()) + } + }() + } + + return nil } func (p *postgresFarmRepository) Delete(ctx context.Context, uuid string) error { diff --git a/backend/migrations/00005_create_analytic_table.sql b/backend/migrations/00005_create_analytic_table.sql new file mode 100644 index 0000000..b415297 --- /dev/null +++ b/backend/migrations/00005_create_analytic_table.sql @@ -0,0 +1,77 @@ +-- +goose Up +-- Create analytics_events table to store all events +CREATE TABLE IF NOT EXISTS public.analytics_events ( + id SERIAL PRIMARY KEY, + farm_id UUID NOT NULL, + event_type TEXT NOT NULL, + event_data JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT fk_analytics_farm FOREIGN KEY (farm_id) REFERENCES public.farms(uuid) ON DELETE CASCADE +); + +-- Create index for faster queries +CREATE INDEX idx_analytics_events_farm_id ON public.analytics_events(farm_id); +CREATE INDEX idx_analytics_events_event_type ON public.analytics_events(event_type); +CREATE INDEX idx_analytics_events_created_at ON public.analytics_events(created_at); + +-- Create a simple materialized view for farm analytics +CREATE MATERIALIZED VIEW public.farm_analytics_view AS +SELECT + f.uuid AS farm_id, + f.name AS farm_name, + f.owner_id, + f.farm_type, + f.total_size, + f.created_at, + f.updated_at, + COUNT(ae.id) AS total_events, + MAX(ae.created_at) AS last_event_at +FROM + public.farms f +LEFT JOIN + public.analytics_events ae ON f.uuid = ae.farm_id +GROUP BY + f.uuid, f.name, f.owner_id, f.farm_type, f.total_size, f.created_at, f.updated_at; + +-- Create index for faster queries +CREATE UNIQUE INDEX idx_farm_analytics_view_farm_id ON public.farm_analytics_view(farm_id); +CREATE INDEX idx_farm_analytics_view_owner_id ON public.farm_analytics_view(owner_id); + +-- Create function to refresh the materialized view +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION public.refresh_farm_analytics_view() +RETURNS TRIGGER AS $$ +BEGIN + REFRESH MATERIALIZED VIEW public.farm_analytics_view; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; +-- +goose StatementEnd + +-- Create trigger to refresh the view when new events are added +CREATE TRIGGER refresh_farm_analytics_view_trigger +AFTER INSERT ON public.analytics_events +FOR EACH STATEMENT +EXECUTE FUNCTION public.refresh_farm_analytics_view(); + +-- Create trigger to refresh the view when farms are updated +CREATE TRIGGER refresh_farm_analytics_view_farms_trigger +AFTER INSERT OR UPDATE OR DELETE ON public.farms +FOR EACH STATEMENT +EXECUTE FUNCTION public.refresh_farm_analytics_view(); + +-- +goose Down +-- Drop triggers first +DROP TRIGGER IF EXISTS refresh_farm_analytics_view_trigger ON public.analytics_events; +DROP TRIGGER IF EXISTS refresh_farm_analytics_view_farms_trigger ON public.farms; + +-- Drop function +-- +goose StatementBegin +DROP FUNCTION IF EXISTS public.refresh_farm_analytics_view() CASCADE; +-- +goose StatementEnd + +-- Drop materialized view +DROP MATERIALIZED VIEW IF EXISTS public.farm_analytics_view CASCADE; + +-- Drop table with CASCADE to ensure all dependencies are removed +DROP TABLE IF EXISTS public.analytics_events CASCADE; \ No newline at end of file diff --git a/backend/migrations/00006_add_farm_analytic_view.sql b/backend/migrations/00006_add_farm_analytic_view.sql new file mode 100644 index 0000000..6b4bcb2 --- /dev/null +++ b/backend/migrations/00006_add_farm_analytic_view.sql @@ -0,0 +1,106 @@ +-- +goose Up +-- Drop the existing materialized view +DROP MATERIALIZED VIEW IF EXISTS public.farm_analytics_view; + +-- Create a new materialized view that matches the GetFarmAnalytics function +CREATE MATERIALIZED VIEW public.farm_analytics_view AS +SELECT + f.uuid AS farm_id, + f.name AS farm_name, + f.owner_id, + COALESCE(MAX(ae.created_at), f.updated_at) AS last_updated, + + -- Weather data aggregation + ( + SELECT jsonb_build_object( + 'temperature', AVG((ae_w.event_data->>'temperature')::float) FILTER (WHERE ae_w.event_data ? 'temperature'), + 'humidity', AVG((ae_w.event_data->>'humidity')::float) FILTER (WHERE ae_w.event_data ? 'humidity'), + 'forecast', jsonb_agg(ae_w.event_data->'forecast') FILTER (WHERE ae_w.event_data ? 'forecast') + ) + FROM analytics_events ae_w + WHERE ae_w.farm_id = f.uuid AND ae_w.event_type = 'weather.updated' + GROUP BY ae_w.farm_id + ) AS weather_data, + + -- Inventory data aggregation + ( + SELECT jsonb_build_object( + 'items', COALESCE(jsonb_agg(ae_i.event_data->'items'), '[]'::jsonb), + 'last_updated', MAX(ae_i.created_at) + ) + FROM analytics_events ae_i + WHERE ae_i.farm_id = f.uuid AND ae_i.event_type = 'inventory.updated' + GROUP BY ae_i.farm_id + ) AS inventory_data, + + -- Plant health data aggregation + ( + SELECT jsonb_build_object( + 'status', MAX(ae_p.event_data->>'status'), + 'issues', COALESCE(jsonb_agg(ae_p.event_data->'issues') FILTER (WHERE ae_p.event_data ? 'issues'), '[]'::jsonb) + ) + FROM analytics_events ae_p + WHERE ae_p.farm_id = f.uuid AND ae_p.event_type = 'plant_health.updated' + GROUP BY ae_p.farm_id + ) AS plant_health_data, + + -- Financial data aggregation + ( + SELECT jsonb_build_object( + 'revenue', SUM((ae_f.event_data->>'revenue')::float) FILTER (WHERE ae_f.event_data ? 'revenue'), + 'expenses', SUM((ae_f.event_data->>'expenses')::float) FILTER (WHERE ae_f.event_data ? 'expenses'), + 'profit', SUM((ae_f.event_data->>'profit')::float) FILTER (WHERE ae_f.event_data ? 'profit') + ) + FROM analytics_events ae_f + WHERE ae_f.farm_id = f.uuid AND ae_f.event_type = 'financial.updated' + GROUP BY ae_f.farm_id + ) AS financial_data, + + -- Production data aggregation + ( + SELECT jsonb_build_object( + 'yield', SUM((ae_pr.event_data->>'yield')::float) FILTER (WHERE ae_pr.event_data ? 'yield'), + 'forecast', MAX(ae_pr.event_data->'forecast') + ) + FROM analytics_events ae_pr + WHERE ae_pr.farm_id = f.uuid AND ae_pr.event_type = 'production.updated' + GROUP BY ae_pr.farm_id + ) AS production_data + +FROM + public.farms f +LEFT JOIN + public.analytics_events ae ON f.uuid = ae.farm_id +GROUP BY + f.uuid, f.name, f.owner_id; + +-- Create indexes for faster queries +CREATE UNIQUE INDEX idx_farm_analytics_view_farm_id ON public.farm_analytics_view(farm_id); +CREATE INDEX idx_farm_analytics_view_owner_id ON public.farm_analytics_view(owner_id); + +-- +goose Down +-- Drop the new materialized view +DROP MATERIALIZED VIEW IF EXISTS public.farm_analytics_view; + +-- Restore the original materialized view +CREATE MATERIALIZED VIEW public.farm_analytics_view AS +SELECT + f.uuid AS farm_id, + f.name AS farm_name, + f.owner_id, + f.farm_type, + f.total_size, + f.created_at, + f.updated_at, + COUNT(ae.id) AS total_events, + MAX(ae.created_at) AS last_event_at +FROM + public.farms f +LEFT JOIN + public.analytics_events ae ON f.uuid = ae.farm_id +GROUP BY + f.uuid, f.name, f.owner_id, f.farm_type, f.total_size, f.created_at, f.updated_at; + +-- Recreate indexes +CREATE UNIQUE INDEX idx_farm_analytics_view_farm_id ON public.farm_analytics_view(farm_id); +CREATE INDEX idx_farm_analytics_view_owner_id ON public.farm_analytics_view(owner_id); \ No newline at end of file