feat: add analytic event aggregator

This commit is contained in:
Sosokker 2025-03-27 23:45:39 +07:00
parent a5108e9990
commit 21559f3b55
14 changed files with 797 additions and 12 deletions

View File

@ -2,8 +2,8 @@ root = "."
tmp_dir = "tmp"
[build]
cmd = "go build -o ./tmp/api ./cmd/forfarm"
bin = "./tmp/api"
cmd = "go build -o ./tmp/api.exe ./cmd/forfarm"
bin = "./tmp/api.exe"
args_bin = ["api"]
include_ext = ["go", "tpl", "tmpl", "html"]
exclude_dir = ["assets", "tmp", "vendor"]

View File

@ -22,8 +22,9 @@ import (
)
type api struct {
logger *slog.Logger
httpClient *http.Client
logger *slog.Logger
httpClient *http.Client
eventPublisher domain.EventPublisher
userRepo domain.UserRepository
cropRepo domain.CroplandRepository
@ -31,7 +32,7 @@ type api struct {
plantRepo domain.PlantRepository
}
func NewAPI(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool) *api {
func NewAPI(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool, eventPublisher domain.EventPublisher) *api {
client := &http.Client{}
@ -40,9 +41,12 @@ func NewAPI(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool) *api {
farmRepository := repository.NewPostgresFarm(pool)
plantRepository := repository.NewPostgresPlant(pool)
farmRepository.SetEventPublisher(eventPublisher)
return &api{
logger: logger,
httpClient: client,
logger: logger,
httpClient: client,
eventPublisher: eventPublisher,
userRepo: userRepository,
cropRepo: croplandRepository,
@ -72,7 +76,7 @@ func (a *api) Routes() *chi.Mux {
router.Use(cors.Handler(cors.Options{
// AllowedOrigins: []string{"https://foo.com"}, // Use this to allow specific origin hosts
AllowedOrigins: []string{"https://*", "http://*"},
AllowedOrigins: []string{"https://*", "http://*", "http://localhost:3000"},
// AllowOriginFunc: func(r *http.Request, origin string) bool { return true },
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "X-CSRF-Token"},

View File

@ -12,6 +12,8 @@ import (
"github.com/forfarm/backend/internal/api"
"github.com/forfarm/backend/internal/cmdutil"
"github.com/forfarm/backend/internal/config"
"github.com/forfarm/backend/internal/event"
"github.com/forfarm/backend/internal/repository"
)
func APICmd(ctx context.Context) *cobra.Command {
@ -32,7 +34,32 @@ func APICmd(ctx context.Context) *cobra.Command {
logger.Info("connected to database")
api := api.NewAPI(ctx, logger, pool)
// Events
eventBus, err := event.NewRabbitMQEventBus(config.RABBITMQ_URL, logger)
if err != nil {
logger.Error("failed to connect to event bus", "err", err)
os.Exit(1)
}
logger.Info("connecting to event bus", "url", config.RABBITMQ_URL)
aggregator := event.NewEventAggregator(eventBus, eventBus, logger)
if err := aggregator.Start(ctx); err != nil {
logger.Error("failed to start event aggregator", "err", err)
os.Exit(1)
}
logger.Info("Start event aggregator")
analyticRepo := repository.NewPostgresAnalyticsRepository(pool)
projection := event.NewAnalyticsProjection(eventBus, analyticRepo, logger)
if err := projection.Start(ctx); err != nil {
logger.Error("failed to start analytics projection", "err", err)
os.Exit(1)
}
logger.Info("Start analytics projection")
//
api := api.NewAPI(ctx, logger, pool, eventBus)
server := api.Server(port)
go func() {

View File

@ -16,6 +16,7 @@ var (
GOOGLE_CLIENT_SECRET string
GOOGLE_REDIRECT_URL string
JWT_SECRET_KEY string
RABBITMQ_URL string
)
func Load() {
@ -28,6 +29,7 @@ func Load() {
viper.SetDefault("GOOGLE_CLIENT_SECRET", "google_client_secret")
viper.SetDefault("JWT_SECRET_KEY", "jwt_secret_key")
viper.SetDefault("GOOGLE_REDIRECT_URL", "http://localhost:8000/auth/login/google")
viper.SetDefault("RABBITMQ_URL", "amqp://user:password@localhost:5672/")
viper.SetConfigFile(".env")
viper.AddConfigPath("../../.")
@ -47,4 +49,5 @@ func Load() {
GOOGLE_CLIENT_SECRET = viper.GetString("GOOGLE_CLIENT_SECRET")
GOOGLE_REDIRECT_URL = viper.GetString("GOOGLE_REDIRECT_URL")
JWT_SECRET_KEY = viper.GetString("JWT_SECRET_KEY")
RABBITMQ_URL = viper.GetString("RABBITMQ_URL")
}

View File

@ -0,0 +1,89 @@
package domain
import (
"context"
"time"
)
type FarmAnalytics struct {
FarmID string
Name string
OwnerID string
LastUpdated time.Time
WeatherData *WeatherAnalytics `json:"weather_data,omitempty"`
InventoryData *InventoryAnalytics `json:"inventory_data,omitempty"`
PlantHealthData *PlantHealthAnalytics `json:"plant_health_data,omitempty"`
FinancialData *FinancialAnalytics `json:"financial_data,omitempty"`
ProductionData *ProductionAnalytics `json:"production_data,omitempty"`
}
type WeatherAnalytics struct {
LastUpdated time.Time
Temperature float64
Humidity float64
Rainfall float64
WindSpeed float64
WeatherStatus string
AlertLevel string
ForecastSummary string
}
type InventoryAnalytics struct {
LastUpdated time.Time
TotalItems int
LowStockItems int
TotalValue float64
RecentChanges []InventoryChange
}
type InventoryChange struct {
ItemID string
ItemName string
ChangeAmount float64
ChangeType string
ChangedAt time.Time
}
type PlantHealthAnalytics struct {
LastUpdated time.Time
HealthyPlants int
UnhealthyPlants int
CriticalPlants int
RecentHealthIssues []PlantHealthIssue
}
type PlantHealthIssue struct {
PlantID string
PlantName string
HealthStatus string
AlertLevel string
RecordedAt time.Time
}
type FinancialAnalytics struct {
LastUpdated time.Time
TotalRevenue float64
TotalExpenses float64
NetProfit float64
RecentTransactions []TransactionSummary
}
type TransactionSummary struct {
TransactionID string
Type string
Amount float64
Status string
CreatedAt time.Time
}
type ProductionAnalytics struct {
LastUpdated time.Time
TotalProduction float64
YieldRate float64
HarvestForecast float64
}
type AnalyticsRepository interface {
GetFarmAnalytics(ctx context.Context, farmID string) (*FarmAnalytics, error)
SaveFarmAnalytics(ctx context.Context, farmID string, data interface{}) error
}

View File

@ -0,0 +1,28 @@
package domain
import (
"context"
"time"
)
type Event struct {
ID string
Type string
Source string
Timestamp time.Time
Payload interface{}
AggregateID string
}
type EventPublisher interface {
Publish(ctx context.Context, event Event) error
}
type EventSubscriber interface {
Subscribe(ctx context.Context, eventType string, handler func(Event) error) error
}
type EventBus interface {
EventPublisher
EventSubscriber
}

View File

@ -33,4 +33,5 @@ type FarmRepository interface {
GetByOwnerID(context.Context, string) ([]Farm, error)
CreateOrUpdate(context.Context, *Farm) error
Delete(context.Context, string) error
SetEventPublisher(EventPublisher)
}

View File

@ -0,0 +1,64 @@
package event
import (
"context"
"log/slog"
"time"
"github.com/forfarm/backend/internal/domain"
)
type EventAggregator struct {
sourceSubscriber domain.EventSubscriber
targetPublisher domain.EventPublisher
logger *slog.Logger
}
func NewEventAggregator(
sourceSubscriber domain.EventSubscriber,
targetPublisher domain.EventPublisher,
logger *slog.Logger,
) *EventAggregator {
return &EventAggregator{
sourceSubscriber: sourceSubscriber,
targetPublisher: targetPublisher,
logger: logger,
}
}
func (a *EventAggregator) Start(ctx context.Context) error {
// Subscribe to fine-grained events
eventTypes := []string{
"farm.created", "farm.updated", "farm.deleted",
"weather.updated", "inventory.changed", "marketplace.transaction",
}
for _, eventType := range eventTypes {
if err := a.sourceSubscriber.Subscribe(ctx, eventType, a.handleEvent); err != nil {
return err
}
}
return nil
}
func (a *EventAggregator) handleEvent(event domain.Event) error {
// Logic to aggregate events
// For example, combine farm and weather events into a farm status event
if event.Type == "farm.created" || event.Type == "farm.updated" {
// Create a coarse-grained event
aggregatedEvent := domain.Event{
ID: event.ID,
Type: "farm.status_changed",
Source: "event-aggregator",
Timestamp: time.Now(),
Payload: event.Payload,
AggregateID: event.AggregateID,
}
return a.targetPublisher.Publish(context.Background(), aggregatedEvent)
}
return nil
}

View File

@ -0,0 +1,150 @@
package event
import (
"context"
"encoding/json"
"log/slog"
"github.com/forfarm/backend/internal/domain"
amqp "github.com/rabbitmq/amqp091-go"
)
type RabbitMQEventBus struct {
conn *amqp.Connection
channel *amqp.Channel
logger *slog.Logger
}
func NewRabbitMQEventBus(url string, logger *slog.Logger) (*RabbitMQEventBus, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, err
}
// Declare the exchange
err = ch.ExchangeDeclare(
"events", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
ch.Close()
conn.Close()
return nil, err
}
return &RabbitMQEventBus{
conn: conn,
channel: ch,
logger: logger,
}, nil
}
func (r *RabbitMQEventBus) Publish(ctx context.Context, event domain.Event) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
return r.channel.PublishWithContext(
ctx,
"events", // exchange
"events."+event.Type, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: data,
DeliveryMode: amqp.Persistent,
MessageId: event.ID,
Timestamp: event.Timestamp,
},
)
}
func (r *RabbitMQEventBus) Subscribe(ctx context.Context, eventType string, handler func(domain.Event) error) error {
// Declare a queue for this consumer
q, err := r.channel.QueueDeclare(
"", // name (empty = auto-generated)
false, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
// Bind the queue to the exchange
err = r.channel.QueueBind(
q.Name, // queue name
"events."+eventType, // routing key
"events", // exchange
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
// Start consuming
msgs, err := r.channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}
go func() {
for {
select {
case <-ctx.Done():
return
case msg, ok := <-msgs:
if !ok {
return
}
var event domain.Event
if err := json.Unmarshal(msg.Body, &event); err != nil {
r.logger.Error("Failed to unmarshal event", "error", err)
msg.Nack(false, false)
continue
}
if err := handler(event); err != nil {
r.logger.Error("Failed to handle event", "error", err)
msg.Nack(false, true) // requeue
} else {
msg.Ack(false)
}
}
}
}()
return nil
}
func (r *RabbitMQEventBus) Close() error {
if err := r.channel.Close(); err != nil {
return err
}
return r.conn.Close()
}

View File

@ -0,0 +1,48 @@
package event
import (
"context"
"log/slog"
"github.com/forfarm/backend/internal/domain"
)
type AnalyticsProjection struct {
eventSubscriber domain.EventSubscriber
repository domain.AnalyticsRepository
logger *slog.Logger
}
func NewAnalyticsProjection(
subscriber domain.EventSubscriber,
repository domain.AnalyticsRepository,
logger *slog.Logger,
) *AnalyticsProjection {
return &AnalyticsProjection{
eventSubscriber: subscriber,
repository: repository,
logger: logger,
}
}
func (p *AnalyticsProjection) Start(ctx context.Context) error {
// Subscribe to coarse-grained events
eventTypes := []string{"farm.status_changed"}
for _, eventType := range eventTypes {
if err := p.eventSubscriber.Subscribe(ctx, eventType, p.handleEvent); err != nil {
return err
}
}
return nil
}
func (p *AnalyticsProjection) handleEvent(event domain.Event) error {
// Update materialized view based on event
if event.Type == "farm.status_changed" {
return p.repository.SaveFarmAnalytics(context.Background(), event.AggregateID, event.Payload)
}
return nil
}

View File

@ -0,0 +1,142 @@
package repository
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/forfarm/backend/internal/domain"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type PostgresAnalyticsRepository struct {
pool *pgxpool.Pool
}
func NewPostgresAnalyticsRepository(pool *pgxpool.Pool) domain.AnalyticsRepository {
return &PostgresAnalyticsRepository{pool: pool}
}
func (p *PostgresAnalyticsRepository) GetFarmAnalytics(ctx context.Context, farmID string) (*domain.FarmAnalytics, error) {
query := `
SELECT
farm_id,
farm_name,
owner_id,
last_updated,
weather_data,
inventory_data,
plant_health_data,
financial_data,
production_data
FROM
farm_analytics_view
WHERE
farm_id = $1`
var analytics domain.FarmAnalytics
var weatherJSON, inventoryJSON, plantHealthJSON, financialJSON, productionJSON []byte
err := p.pool.QueryRow(ctx, query, farmID).Scan(
&analytics.FarmID,
&analytics.Name,
&analytics.OwnerID,
&analytics.LastUpdated,
&weatherJSON,
&inventoryJSON,
&plantHealthJSON,
&financialJSON,
&productionJSON,
)
if err != nil {
if err == pgx.ErrNoRows {
return nil, fmt.Errorf("no analytics found for farm %s", farmID)
}
return nil, err
}
// Unmarshal JSON data into structs
if len(weatherJSON) > 0 {
var weather domain.WeatherAnalytics
if err := json.Unmarshal(weatherJSON, &weather); err == nil {
analytics.WeatherData = &weather
}
}
if len(inventoryJSON) > 0 {
var inventory domain.InventoryAnalytics
if err := json.Unmarshal(inventoryJSON, &inventory); err == nil {
analytics.InventoryData = &inventory
}
}
if len(plantHealthJSON) > 0 {
var plantHealth domain.PlantHealthAnalytics
if err := json.Unmarshal(plantHealthJSON, &plantHealth); err == nil {
analytics.PlantHealthData = &plantHealth
}
}
if len(financialJSON) > 0 {
var financial domain.FinancialAnalytics
if err := json.Unmarshal(financialJSON, &financial); err == nil {
analytics.FinancialData = &financial
}
}
if len(productionJSON) > 0 {
var production domain.ProductionAnalytics
if err := json.Unmarshal(productionJSON, &production); err == nil {
analytics.ProductionData = &production
}
}
return &analytics, nil
}
func (p *PostgresAnalyticsRepository) SaveFarmAnalytics(ctx context.Context, farmID string, data interface{}) error {
var jsonData []byte
var err error
// Handle different possible types of the data parameter
switch v := data.(type) {
case []byte:
jsonData = v
case string:
jsonData = []byte(v)
case map[string]interface{}:
jsonData, err = json.Marshal(v)
default:
jsonData, err = json.Marshal(v)
}
if err != nil {
return fmt.Errorf("failed to prepare JSON data: %w", err)
}
// Validate that we have valid JSON
var testObj interface{}
if err := json.Unmarshal(jsonData, &testObj); err != nil {
return fmt.Errorf("invalid JSON data: %w", err)
}
query := `
INSERT INTO analytics_events (
farm_id,
event_type,
event_data,
created_at
) VALUES ($1, $2, $3::jsonb, $4)`
eventType := "farm.status_changed"
_, err = p.pool.Exec(ctx, query, farmID, eventType, string(jsonData), time.Now())
if err != nil {
return fmt.Errorf("failed to insert analytics event: %w", err)
}
return nil
}

View File

@ -3,19 +3,25 @@ package repository
import (
"context"
"strings"
"time"
"github.com/forfarm/backend/internal/domain"
"github.com/google/uuid"
)
type postgresFarmRepository struct {
conn Connection
conn Connection
eventPublisher domain.EventPublisher
}
func NewPostgresFarm(conn Connection) domain.FarmRepository {
return &postgresFarmRepository{conn: conn}
}
func (p *postgresFarmRepository) SetEventPublisher(publisher domain.EventPublisher) {
p.eventPublisher = publisher
}
func (p *postgresFarmRepository) fetch(ctx context.Context, query string, args ...interface{}) ([]domain.Farm, error) {
rows, err := p.conn.Query(ctx, query, args...)
if err != nil {
@ -77,7 +83,9 @@ func (p *postgresFarmRepository) GetByOwnerID(ctx context.Context, ownerID strin
}
func (p *postgresFarmRepository) CreateOrUpdate(ctx context.Context, f *domain.Farm) error {
if strings.TrimSpace(f.UUID) == "" {
isNew := strings.TrimSpace(f.UUID) == ""
if isNew {
f.UUID = uuid.New().String()
}
@ -93,8 +101,46 @@ func (p *postgresFarmRepository) CreateOrUpdate(ctx context.Context, f *domain.F
updated_at = NOW(),
owner_id = EXCLUDED.owner_id
RETURNING uuid, created_at, updated_at`
return p.conn.QueryRow(ctx, query, f.UUID, f.Name, f.Lat, f.Lon, f.FarmType, f.TotalSize, f.OwnerID).
err := p.conn.QueryRow(ctx, query, f.UUID, f.Name, f.Lat, f.Lon, f.FarmType, f.TotalSize, f.OwnerID).
Scan(&f.UUID, &f.CreatedAt, &f.UpdatedAt)
if err != nil {
return err
}
if p.eventPublisher != nil {
eventType := "farm.updated"
if isNew {
eventType = "farm.created"
}
event := domain.Event{
ID: uuid.New().String(),
Type: eventType,
Source: "farm-repository",
Timestamp: time.Now(),
AggregateID: f.UUID,
Payload: map[string]interface{}{
"farm_id": f.UUID,
"name": f.Name,
"location": map[string]float64{"lat": f.Lat, "lon": f.Lon},
"farm_type": f.FarmType,
"total_size": f.TotalSize,
"owner_id": f.OwnerID,
"created_at": f.CreatedAt,
"updated_at": f.UpdatedAt,
},
}
go func() {
bgCtx := context.Background()
if err := p.eventPublisher.Publish(bgCtx, event); err != nil {
println("Failed to publish event", err.Error())
}
}()
}
return nil
}
func (p *postgresFarmRepository) Delete(ctx context.Context, uuid string) error {

View File

@ -0,0 +1,77 @@
-- +goose Up
-- Create analytics_events table to store all events
CREATE TABLE IF NOT EXISTS public.analytics_events (
id SERIAL PRIMARY KEY,
farm_id UUID NOT NULL,
event_type TEXT NOT NULL,
event_data JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT fk_analytics_farm FOREIGN KEY (farm_id) REFERENCES public.farms(uuid) ON DELETE CASCADE
);
-- Create index for faster queries
CREATE INDEX idx_analytics_events_farm_id ON public.analytics_events(farm_id);
CREATE INDEX idx_analytics_events_event_type ON public.analytics_events(event_type);
CREATE INDEX idx_analytics_events_created_at ON public.analytics_events(created_at);
-- Create a simple materialized view for farm analytics
CREATE MATERIALIZED VIEW public.farm_analytics_view AS
SELECT
f.uuid AS farm_id,
f.name AS farm_name,
f.owner_id,
f.farm_type,
f.total_size,
f.created_at,
f.updated_at,
COUNT(ae.id) AS total_events,
MAX(ae.created_at) AS last_event_at
FROM
public.farms f
LEFT JOIN
public.analytics_events ae ON f.uuid = ae.farm_id
GROUP BY
f.uuid, f.name, f.owner_id, f.farm_type, f.total_size, f.created_at, f.updated_at;
-- Create index for faster queries
CREATE UNIQUE INDEX idx_farm_analytics_view_farm_id ON public.farm_analytics_view(farm_id);
CREATE INDEX idx_farm_analytics_view_owner_id ON public.farm_analytics_view(owner_id);
-- Create function to refresh the materialized view
-- +goose StatementBegin
CREATE OR REPLACE FUNCTION public.refresh_farm_analytics_view()
RETURNS TRIGGER AS $$
BEGIN
REFRESH MATERIALIZED VIEW public.farm_analytics_view;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- +goose StatementEnd
-- Create trigger to refresh the view when new events are added
CREATE TRIGGER refresh_farm_analytics_view_trigger
AFTER INSERT ON public.analytics_events
FOR EACH STATEMENT
EXECUTE FUNCTION public.refresh_farm_analytics_view();
-- Create trigger to refresh the view when farms are updated
CREATE TRIGGER refresh_farm_analytics_view_farms_trigger
AFTER INSERT OR UPDATE OR DELETE ON public.farms
FOR EACH STATEMENT
EXECUTE FUNCTION public.refresh_farm_analytics_view();
-- +goose Down
-- Drop triggers first
DROP TRIGGER IF EXISTS refresh_farm_analytics_view_trigger ON public.analytics_events;
DROP TRIGGER IF EXISTS refresh_farm_analytics_view_farms_trigger ON public.farms;
-- Drop function
-- +goose StatementBegin
DROP FUNCTION IF EXISTS public.refresh_farm_analytics_view() CASCADE;
-- +goose StatementEnd
-- Drop materialized view
DROP MATERIALIZED VIEW IF EXISTS public.farm_analytics_view CASCADE;
-- Drop table with CASCADE to ensure all dependencies are removed
DROP TABLE IF EXISTS public.analytics_events CASCADE;

View File

@ -0,0 +1,106 @@
-- +goose Up
-- Drop the existing materialized view
DROP MATERIALIZED VIEW IF EXISTS public.farm_analytics_view;
-- Create a new materialized view that matches the GetFarmAnalytics function
CREATE MATERIALIZED VIEW public.farm_analytics_view AS
SELECT
f.uuid AS farm_id,
f.name AS farm_name,
f.owner_id,
COALESCE(MAX(ae.created_at), f.updated_at) AS last_updated,
-- Weather data aggregation
(
SELECT jsonb_build_object(
'temperature', AVG((ae_w.event_data->>'temperature')::float) FILTER (WHERE ae_w.event_data ? 'temperature'),
'humidity', AVG((ae_w.event_data->>'humidity')::float) FILTER (WHERE ae_w.event_data ? 'humidity'),
'forecast', jsonb_agg(ae_w.event_data->'forecast') FILTER (WHERE ae_w.event_data ? 'forecast')
)
FROM analytics_events ae_w
WHERE ae_w.farm_id = f.uuid AND ae_w.event_type = 'weather.updated'
GROUP BY ae_w.farm_id
) AS weather_data,
-- Inventory data aggregation
(
SELECT jsonb_build_object(
'items', COALESCE(jsonb_agg(ae_i.event_data->'items'), '[]'::jsonb),
'last_updated', MAX(ae_i.created_at)
)
FROM analytics_events ae_i
WHERE ae_i.farm_id = f.uuid AND ae_i.event_type = 'inventory.updated'
GROUP BY ae_i.farm_id
) AS inventory_data,
-- Plant health data aggregation
(
SELECT jsonb_build_object(
'status', MAX(ae_p.event_data->>'status'),
'issues', COALESCE(jsonb_agg(ae_p.event_data->'issues') FILTER (WHERE ae_p.event_data ? 'issues'), '[]'::jsonb)
)
FROM analytics_events ae_p
WHERE ae_p.farm_id = f.uuid AND ae_p.event_type = 'plant_health.updated'
GROUP BY ae_p.farm_id
) AS plant_health_data,
-- Financial data aggregation
(
SELECT jsonb_build_object(
'revenue', SUM((ae_f.event_data->>'revenue')::float) FILTER (WHERE ae_f.event_data ? 'revenue'),
'expenses', SUM((ae_f.event_data->>'expenses')::float) FILTER (WHERE ae_f.event_data ? 'expenses'),
'profit', SUM((ae_f.event_data->>'profit')::float) FILTER (WHERE ae_f.event_data ? 'profit')
)
FROM analytics_events ae_f
WHERE ae_f.farm_id = f.uuid AND ae_f.event_type = 'financial.updated'
GROUP BY ae_f.farm_id
) AS financial_data,
-- Production data aggregation
(
SELECT jsonb_build_object(
'yield', SUM((ae_pr.event_data->>'yield')::float) FILTER (WHERE ae_pr.event_data ? 'yield'),
'forecast', MAX(ae_pr.event_data->'forecast')
)
FROM analytics_events ae_pr
WHERE ae_pr.farm_id = f.uuid AND ae_pr.event_type = 'production.updated'
GROUP BY ae_pr.farm_id
) AS production_data
FROM
public.farms f
LEFT JOIN
public.analytics_events ae ON f.uuid = ae.farm_id
GROUP BY
f.uuid, f.name, f.owner_id;
-- Create indexes for faster queries
CREATE UNIQUE INDEX idx_farm_analytics_view_farm_id ON public.farm_analytics_view(farm_id);
CREATE INDEX idx_farm_analytics_view_owner_id ON public.farm_analytics_view(owner_id);
-- +goose Down
-- Drop the new materialized view
DROP MATERIALIZED VIEW IF EXISTS public.farm_analytics_view;
-- Restore the original materialized view
CREATE MATERIALIZED VIEW public.farm_analytics_view AS
SELECT
f.uuid AS farm_id,
f.name AS farm_name,
f.owner_id,
f.farm_type,
f.total_size,
f.created_at,
f.updated_at,
COUNT(ae.id) AS total_events,
MAX(ae.created_at) AS last_event_at
FROM
public.farms f
LEFT JOIN
public.analytics_events ae ON f.uuid = ae.farm_id
GROUP BY
f.uuid, f.name, f.owner_id, f.farm_type, f.total_size, f.created_at, f.updated_at;
-- Recreate indexes
CREATE UNIQUE INDEX idx_farm_analytics_view_farm_id ON public.farm_analytics_view(farm_id);
CREATE INDEX idx_farm_analytics_view_owner_id ON public.farm_analytics_view(owner_id);