From 90ee8ff5298c07a4ad06d23d97ae02ae331c15e2 Mon Sep 17 00:00:00 2001 From: Sosokker Date: Wed, 2 Apr 2025 17:59:09 +0700 Subject: [PATCH] refactor(events): Ensure consistent event publishing across repositories --- .../internal/repository/postgres_cropland.go | 174 +++++++++++++----- .../internal/repository/postgres_inventory.go | 150 ++++++++++----- 2 files changed, 235 insertions(+), 89 deletions(-) diff --git a/backend/internal/repository/postgres_cropland.go b/backend/internal/repository/postgres_cropland.go index 5eb0ff7..7f6e3a7 100644 --- a/backend/internal/repository/postgres_cropland.go +++ b/backend/internal/repository/postgres_cropland.go @@ -2,7 +2,10 @@ package repository import ( "context" + "encoding/json" + "fmt" "strings" + "time" "github.com/google/uuid" @@ -10,34 +13,31 @@ import ( ) type postgresCroplandRepository struct { - conn Connection + conn Connection + eventPublisher domain.EventPublisher } func NewPostgresCropland(conn Connection) domain.CroplandRepository { return &postgresCroplandRepository{conn: conn} } +func (p *postgresCroplandRepository) SetEventPublisher(publisher domain.EventPublisher) { + p.eventPublisher = publisher +} + func (p *postgresCroplandRepository) fetch(ctx context.Context, query string, args ...interface{}) ([]domain.Cropland, error) { rows, err := p.conn.Query(ctx, query, args...) if err != nil { return nil, err } defer rows.Close() - var croplands []domain.Cropland for rows.Next() { var c domain.Cropland if err := rows.Scan( - &c.UUID, - &c.Name, - &c.Status, - &c.Priority, - &c.LandSize, - &c.GrowthStage, - &c.PlantID, - &c.FarmID, - &c.CreatedAt, - &c.UpdatedAt, + &c.UUID, &c.Name, &c.Status, &c.Priority, &c.LandSize, + &c.GrowthStage, &c.PlantID, &c.FarmID, &c.GeoFeature, + &c.CreatedAt, &c.UpdatedAt, ); err != nil { return nil, err } @@ -46,9 +46,17 @@ func (p *postgresCroplandRepository) fetch(ctx context.Context, query string, ar return croplands, nil } +func (p *postgresCroplandRepository) GetAll(ctx context.Context) ([]domain.Cropland, error) { + query := ` + SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, geo_feature, created_at, updated_at + FROM croplands` + + return p.fetch(ctx, query) +} + func (p *postgresCroplandRepository) GetByID(ctx context.Context, uuid string) (domain.Cropland, error) { query := ` - SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, created_at, updated_at + SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, geo_feature, created_at, updated_at FROM croplands WHERE uuid = $1` @@ -64,7 +72,7 @@ func (p *postgresCroplandRepository) GetByID(ctx context.Context, uuid string) ( func (p *postgresCroplandRepository) GetByFarmID(ctx context.Context, farmID string) ([]domain.Cropland, error) { query := ` - SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, created_at, updated_at + SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, geo_feature, created_at, updated_at FROM croplands WHERE farm_id = $1` @@ -72,47 +80,119 @@ func (p *postgresCroplandRepository) GetByFarmID(ctx context.Context, farmID str } func (p *postgresCroplandRepository) CreateOrUpdate(ctx context.Context, c *domain.Cropland) error { + isNew := false if strings.TrimSpace(c.UUID) == "" { - c.UUID = uuid.New().String() + c.UUID = uuid.NewString() + isNew = true } - query := ` - INSERT INTO croplands (uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW(), NOW()) - ON CONFLICT (uuid) DO UPDATE - SET name = EXCLUDED.name, - status = EXCLUDED.status, - priority = EXCLUDED.priority, - land_size = EXCLUDED.land_size, - growth_stage = EXCLUDED.growth_stage, - plant_id = EXCLUDED.plant_id, - farm_id = EXCLUDED.farm_id, - updated_at = NOW() - RETURNING uuid, created_at, updated_at` + if c.GeoFeature != nil && len(c.GeoFeature) == 0 { + c.GeoFeature = nil + } - return p.conn.QueryRow( - ctx, - query, - c.UUID, - c.Name, - c.Status, - c.Priority, - c.LandSize, - c.GrowthStage, - c.PlantID, - c.FarmID, - ).Scan(&c.UUID, &c.CreatedAt, &c.UpdatedAt) // Fixed Scan call + query := ` + INSERT INTO croplands ( + uuid, name, status, priority, land_size, growth_stage, + plant_id, farm_id, geo_feature, created_at, updated_at + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), NOW()) + ON CONFLICT (uuid) DO UPDATE + SET name = EXCLUDED.name, status = EXCLUDED.status, priority = EXCLUDED.priority, + land_size = EXCLUDED.land_size, growth_stage = EXCLUDED.growth_stage, + plant_id = EXCLUDED.plant_id, farm_id = EXCLUDED.farm_id, + geo_feature = EXCLUDED.geo_feature, updated_at = NOW() + RETURNING uuid, created_at, updated_at` + + err := p.conn.QueryRow( + ctx, query, + c.UUID, c.Name, c.Status, c.Priority, c.LandSize, c.GrowthStage, + c.PlantID, c.FarmID, c.GeoFeature, + ).Scan(&c.UUID, &c.CreatedAt, &c.UpdatedAt) + + if err != nil { + return err + } + + if p.eventPublisher != nil { + eventType := "cropland.updated" + if isNew { + eventType = "cropland.created" + } + + // Avoid sending raw json.RawMessage directly if possible + var geoFeatureMap interface{} + if c.GeoFeature != nil { + _ = json.Unmarshal(c.GeoFeature, &geoFeatureMap) + } + + payload := map[string]interface{}{ + "crop_id": c.UUID, + "name": c.Name, + "status": c.Status, + "priority": c.Priority, + "land_size": c.LandSize, + "growth_stage": c.GrowthStage, + "plant_id": c.PlantID, + "farm_id": c.FarmID, + "geo_feature": geoFeatureMap, + "created_at": c.CreatedAt, + "updated_at": c.UpdatedAt, + "event_type": eventType, + } + + event := domain.Event{ + ID: uuid.NewString(), + Type: eventType, + Source: "cropland-repository", + Timestamp: time.Now().UTC(), + AggregateID: c.UUID, + Payload: payload, + } + go func() { + bgCtx := context.Background() + if errPub := p.eventPublisher.Publish(bgCtx, event); errPub != nil { + fmt.Printf("Error publishing %s event: %v\n", eventType, errPub) // Replace with proper logging + } + }() + } + + return nil } + func (p *postgresCroplandRepository) Delete(ctx context.Context, uuid string) error { + // Optional: Fetch details before deleting if needed for event payload + // cropland, err := p.GetByID(ctx, uuid) // Might fail if already deleted, handle carefully + // if err != nil && !errors.Is(err, domain.ErrNotFound){ return err } // Return actual errors + query := `DELETE FROM croplands WHERE uuid = $1` _, err := p.conn.Exec(ctx, query, uuid) - return err -} + if err != nil { + return err + } -func (p *postgresCroplandRepository) GetAll(ctx context.Context) ([]domain.Cropland, error) { - query := ` - SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, created_at, updated_at - FROM croplands` + if p.eventPublisher != nil { + eventType := "cropland.deleted" + payload := map[string]interface{}{ + "crop_id": uuid, + // Include farm_id if easily available or fetched before delete + // "farm_id": cropland.FarmID + "event_type": eventType, + } + event := domain.Event{ + ID: uuid, + Type: eventType, + Source: "cropland-repository", + Timestamp: time.Now().UTC(), + AggregateID: uuid, + Payload: payload, + } + go func() { + bgCtx := context.Background() + if errPub := p.eventPublisher.Publish(bgCtx, event); errPub != nil { + fmt.Printf("Error publishing %s event: %v\n", eventType, errPub) + } + }() + } - return p.fetch(ctx, query) + return nil } diff --git a/backend/internal/repository/postgres_inventory.go b/backend/internal/repository/postgres_inventory.go index 1bd5d98..c0e1287 100644 --- a/backend/internal/repository/postgres_inventory.go +++ b/backend/internal/repository/postgres_inventory.go @@ -7,14 +7,16 @@ import ( "time" "github.com/forfarm/backend/internal/domain" + "github.com/google/uuid" ) type postgresInventoryRepository struct { - conn Connection + conn Connection + eventPublisher domain.EventPublisher } -func NewPostgresInventory(conn Connection) domain.InventoryRepository { - return &postgresInventoryRepository{conn: conn} +func NewPostgresInventory(conn Connection, publisher domain.EventPublisher) domain.InventoryRepository { + return &postgresInventoryRepository{conn: conn, eventPublisher: publisher} } func (p *postgresInventoryRepository) fetch(ctx context.Context, query string, args ...interface{}) ([]domain.InventoryItem, error) { @@ -219,60 +221,124 @@ func (p *postgresInventoryRepository) GetAll(ctx context.Context) ([]domain.Inve func (p *postgresInventoryRepository) CreateOrUpdate(ctx context.Context, item *domain.InventoryItem) error { now := time.Now() item.UpdatedAt = now + isNew := false if item.ID == "" { + isNew = true item.CreatedAt = now - query := ` - INSERT INTO inventory_items - (id, user_id, name, category_id, quantity, unit_id, date_added, status_id, created_at, updated_at) + query := ` + INSERT INTO inventory_items + (id, user_id, name, category_id, quantity, unit_id, date_added, status_id, created_at, updated_at) VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id` - return p.conn.QueryRow( + err := p.conn.QueryRow( ctx, query, - item.UserID, - item.Name, - item.CategoryID, - item.Quantity, - item.UnitID, - item.DateAdded, - item.StatusID, - item.CreatedAt, - item.UpdatedAt, + item.UserID, item.Name, item.CategoryID, item.Quantity, + item.UnitID, item.DateAdded, item.StatusID, item.CreatedAt, item.UpdatedAt, ).Scan(&item.ID) + if err != nil { + // Log error + return err + } + } else { + query := ` + UPDATE inventory_items + SET name = $1, category_id = $2, quantity = $3, unit_id = $4, + date_added = $5, status_id = $6, updated_at = $7 + WHERE id = $8 AND user_id = $9 + RETURNING id` // Ensure RETURNING id exists or handle differently + err := p.conn.QueryRow( + ctx, + query, + item.Name, item.CategoryID, item.Quantity, item.UnitID, + item.DateAdded, item.StatusID, item.UpdatedAt, item.ID, item.UserID, + ).Scan(&item.ID) // Scan to confirm update happened or handle potential ErrNoRows + if err != nil { + // Log error + return err + } } - query := ` - UPDATE inventory_items - SET name = $1, - category_id = $2, - quantity = $3, - unit_id = $4, - date_added = $5, - status_id = $6, - updated_at = $7 - WHERE id = $8 AND user_id = $9 - RETURNING id` + // --- Publish Event --- + if p.eventPublisher != nil { + eventType := "inventory.item.updated" + if isNew { + eventType = "inventory.item.created" + } - return p.conn.QueryRow( - ctx, - query, - item.Name, - item.CategoryID, - item.Quantity, - item.UnitID, - item.DateAdded, - item.StatusID, - item.UpdatedAt, - item.ID, - item.UserID, - ).Scan(&item.ID) + payload := map[string]interface{}{ + "item_id": item.ID, + "user_id": item.UserID, // Include user ID for potential farm lookup in projection + "name": item.Name, + "category_id": item.CategoryID, + "quantity": item.Quantity, + "unit_id": item.UnitID, + "status_id": item.StatusID, + "date_added": item.DateAdded, + "updated_at": item.UpdatedAt, + // NO farm_id easily available here without extra lookup + } + + event := domain.Event{ + ID: uuid.NewString(), + Type: eventType, + Source: "inventory-repository", + Timestamp: time.Now().UTC(), + AggregateID: item.UserID, // Use UserID as AggregateID for inventory? Or item.ID? Let's use item.ID. + Payload: payload, + } + // Use AggregateID = item.ID for consistency if item is the aggregate root + event.AggregateID = item.ID + + go func() { + bgCtx := context.Background() + if errPub := p.eventPublisher.Publish(bgCtx, event); errPub != nil { + fmt.Printf("Error publishing %s event: %v\n", eventType, errPub) // Use proper logging + } + }() + } + // --- End Publish Event --- + + return nil } func (p *postgresInventoryRepository) Delete(ctx context.Context, id, userID string) error { query := `DELETE FROM inventory_items WHERE id = $1 AND user_id = $2` - _, err := p.conn.Exec(ctx, query, id, userID) - return err + cmdTag, err := p.conn.Exec(ctx, query, id, userID) + if err != nil { + // Log error + return err + } + if cmdTag.RowsAffected() == 0 { + return domain.ErrNotFound // Or a permission error if user doesn't match + } + + // --- Publish Event --- + if p.eventPublisher != nil { + eventType := "inventory.item.deleted" + payload := map[string]interface{}{ + "item_id": id, + "user_id": userID, // Include user ID + } + event := domain.Event{ + ID: uuid.NewString(), + Type: eventType, + Source: "inventory-repository", + Timestamp: time.Now().UTC(), + AggregateID: id, // Use item ID as aggregate ID + Payload: payload, + } + go func() { + bgCtx := context.Background() + if errPub := p.eventPublisher.Publish(bgCtx, event); errPub != nil { + fmt.Printf("Error publishing %s event: %v\n", eventType, errPub) // Use proper logging + } + }() + } + // --- End Publish Event --- + + return nil } func (p *postgresInventoryRepository) GetStatuses(ctx context.Context) ([]domain.InventoryStatus, error) {