refactor(events): Ensure consistent event publishing across repositories

This commit is contained in:
Sosokker 2025-04-02 17:59:09 +07:00
parent 89a80f7352
commit 90ee8ff529
2 changed files with 235 additions and 89 deletions

View File

@ -2,7 +2,10 @@ package repository
import ( import (
"context" "context"
"encoding/json"
"fmt"
"strings" "strings"
"time"
"github.com/google/uuid" "github.com/google/uuid"
@ -10,34 +13,31 @@ import (
) )
type postgresCroplandRepository struct { type postgresCroplandRepository struct {
conn Connection conn Connection
eventPublisher domain.EventPublisher
} }
func NewPostgresCropland(conn Connection) domain.CroplandRepository { func NewPostgresCropland(conn Connection) domain.CroplandRepository {
return &postgresCroplandRepository{conn: conn} return &postgresCroplandRepository{conn: conn}
} }
func (p *postgresCroplandRepository) SetEventPublisher(publisher domain.EventPublisher) {
p.eventPublisher = publisher
}
func (p *postgresCroplandRepository) fetch(ctx context.Context, query string, args ...interface{}) ([]domain.Cropland, error) { func (p *postgresCroplandRepository) fetch(ctx context.Context, query string, args ...interface{}) ([]domain.Cropland, error) {
rows, err := p.conn.Query(ctx, query, args...) rows, err := p.conn.Query(ctx, query, args...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
var croplands []domain.Cropland var croplands []domain.Cropland
for rows.Next() { for rows.Next() {
var c domain.Cropland var c domain.Cropland
if err := rows.Scan( if err := rows.Scan(
&c.UUID, &c.UUID, &c.Name, &c.Status, &c.Priority, &c.LandSize,
&c.Name, &c.GrowthStage, &c.PlantID, &c.FarmID, &c.GeoFeature,
&c.Status, &c.CreatedAt, &c.UpdatedAt,
&c.Priority,
&c.LandSize,
&c.GrowthStage,
&c.PlantID,
&c.FarmID,
&c.CreatedAt,
&c.UpdatedAt,
); err != nil { ); err != nil {
return nil, err return nil, err
} }
@ -46,9 +46,17 @@ func (p *postgresCroplandRepository) fetch(ctx context.Context, query string, ar
return croplands, nil return croplands, nil
} }
func (p *postgresCroplandRepository) GetAll(ctx context.Context) ([]domain.Cropland, error) {
query := `
SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, geo_feature, created_at, updated_at
FROM croplands`
return p.fetch(ctx, query)
}
func (p *postgresCroplandRepository) GetByID(ctx context.Context, uuid string) (domain.Cropland, error) { func (p *postgresCroplandRepository) GetByID(ctx context.Context, uuid string) (domain.Cropland, error) {
query := ` query := `
SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, created_at, updated_at SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, geo_feature, created_at, updated_at
FROM croplands FROM croplands
WHERE uuid = $1` WHERE uuid = $1`
@ -64,7 +72,7 @@ func (p *postgresCroplandRepository) GetByID(ctx context.Context, uuid string) (
func (p *postgresCroplandRepository) GetByFarmID(ctx context.Context, farmID string) ([]domain.Cropland, error) { func (p *postgresCroplandRepository) GetByFarmID(ctx context.Context, farmID string) ([]domain.Cropland, error) {
query := ` query := `
SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, created_at, updated_at SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, geo_feature, created_at, updated_at
FROM croplands FROM croplands
WHERE farm_id = $1` WHERE farm_id = $1`
@ -72,47 +80,119 @@ func (p *postgresCroplandRepository) GetByFarmID(ctx context.Context, farmID str
} }
func (p *postgresCroplandRepository) CreateOrUpdate(ctx context.Context, c *domain.Cropland) error { func (p *postgresCroplandRepository) CreateOrUpdate(ctx context.Context, c *domain.Cropland) error {
isNew := false
if strings.TrimSpace(c.UUID) == "" { if strings.TrimSpace(c.UUID) == "" {
c.UUID = uuid.New().String() c.UUID = uuid.NewString()
isNew = true
} }
query := ` if c.GeoFeature != nil && len(c.GeoFeature) == 0 {
INSERT INTO croplands (uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, created_at, updated_at) c.GeoFeature = nil
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW(), NOW()) }
ON CONFLICT (uuid) DO UPDATE
SET name = EXCLUDED.name,
status = EXCLUDED.status,
priority = EXCLUDED.priority,
land_size = EXCLUDED.land_size,
growth_stage = EXCLUDED.growth_stage,
plant_id = EXCLUDED.plant_id,
farm_id = EXCLUDED.farm_id,
updated_at = NOW()
RETURNING uuid, created_at, updated_at`
return p.conn.QueryRow( query := `
ctx, INSERT INTO croplands (
query, uuid, name, status, priority, land_size, growth_stage,
c.UUID, plant_id, farm_id, geo_feature, created_at, updated_at
c.Name, )
c.Status, VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), NOW())
c.Priority, ON CONFLICT (uuid) DO UPDATE
c.LandSize, SET name = EXCLUDED.name, status = EXCLUDED.status, priority = EXCLUDED.priority,
c.GrowthStage, land_size = EXCLUDED.land_size, growth_stage = EXCLUDED.growth_stage,
c.PlantID, plant_id = EXCLUDED.plant_id, farm_id = EXCLUDED.farm_id,
c.FarmID, geo_feature = EXCLUDED.geo_feature, updated_at = NOW()
).Scan(&c.UUID, &c.CreatedAt, &c.UpdatedAt) // Fixed Scan call RETURNING uuid, created_at, updated_at`
err := p.conn.QueryRow(
ctx, query,
c.UUID, c.Name, c.Status, c.Priority, c.LandSize, c.GrowthStage,
c.PlantID, c.FarmID, c.GeoFeature,
).Scan(&c.UUID, &c.CreatedAt, &c.UpdatedAt)
if err != nil {
return err
}
if p.eventPublisher != nil {
eventType := "cropland.updated"
if isNew {
eventType = "cropland.created"
}
// Avoid sending raw json.RawMessage directly if possible
var geoFeatureMap interface{}
if c.GeoFeature != nil {
_ = json.Unmarshal(c.GeoFeature, &geoFeatureMap)
}
payload := map[string]interface{}{
"crop_id": c.UUID,
"name": c.Name,
"status": c.Status,
"priority": c.Priority,
"land_size": c.LandSize,
"growth_stage": c.GrowthStage,
"plant_id": c.PlantID,
"farm_id": c.FarmID,
"geo_feature": geoFeatureMap,
"created_at": c.CreatedAt,
"updated_at": c.UpdatedAt,
"event_type": eventType,
}
event := domain.Event{
ID: uuid.NewString(),
Type: eventType,
Source: "cropland-repository",
Timestamp: time.Now().UTC(),
AggregateID: c.UUID,
Payload: payload,
}
go func() {
bgCtx := context.Background()
if errPub := p.eventPublisher.Publish(bgCtx, event); errPub != nil {
fmt.Printf("Error publishing %s event: %v\n", eventType, errPub) // Replace with proper logging
}
}()
}
return nil
} }
func (p *postgresCroplandRepository) Delete(ctx context.Context, uuid string) error { func (p *postgresCroplandRepository) Delete(ctx context.Context, uuid string) error {
// Optional: Fetch details before deleting if needed for event payload
// cropland, err := p.GetByID(ctx, uuid) // Might fail if already deleted, handle carefully
// if err != nil && !errors.Is(err, domain.ErrNotFound){ return err } // Return actual errors
query := `DELETE FROM croplands WHERE uuid = $1` query := `DELETE FROM croplands WHERE uuid = $1`
_, err := p.conn.Exec(ctx, query, uuid) _, err := p.conn.Exec(ctx, query, uuid)
return err if err != nil {
} return err
}
func (p *postgresCroplandRepository) GetAll(ctx context.Context) ([]domain.Cropland, error) { if p.eventPublisher != nil {
query := ` eventType := "cropland.deleted"
SELECT uuid, name, status, priority, land_size, growth_stage, plant_id, farm_id, created_at, updated_at payload := map[string]interface{}{
FROM croplands` "crop_id": uuid,
// Include farm_id if easily available or fetched before delete
// "farm_id": cropland.FarmID
"event_type": eventType,
}
event := domain.Event{
ID: uuid,
Type: eventType,
Source: "cropland-repository",
Timestamp: time.Now().UTC(),
AggregateID: uuid,
Payload: payload,
}
go func() {
bgCtx := context.Background()
if errPub := p.eventPublisher.Publish(bgCtx, event); errPub != nil {
fmt.Printf("Error publishing %s event: %v\n", eventType, errPub)
}
}()
}
return p.fetch(ctx, query) return nil
} }

View File

@ -7,14 +7,16 @@ import (
"time" "time"
"github.com/forfarm/backend/internal/domain" "github.com/forfarm/backend/internal/domain"
"github.com/google/uuid"
) )
type postgresInventoryRepository struct { type postgresInventoryRepository struct {
conn Connection conn Connection
eventPublisher domain.EventPublisher
} }
func NewPostgresInventory(conn Connection) domain.InventoryRepository { func NewPostgresInventory(conn Connection, publisher domain.EventPublisher) domain.InventoryRepository {
return &postgresInventoryRepository{conn: conn} return &postgresInventoryRepository{conn: conn, eventPublisher: publisher}
} }
func (p *postgresInventoryRepository) fetch(ctx context.Context, query string, args ...interface{}) ([]domain.InventoryItem, error) { func (p *postgresInventoryRepository) fetch(ctx context.Context, query string, args ...interface{}) ([]domain.InventoryItem, error) {
@ -219,60 +221,124 @@ func (p *postgresInventoryRepository) GetAll(ctx context.Context) ([]domain.Inve
func (p *postgresInventoryRepository) CreateOrUpdate(ctx context.Context, item *domain.InventoryItem) error { func (p *postgresInventoryRepository) CreateOrUpdate(ctx context.Context, item *domain.InventoryItem) error {
now := time.Now() now := time.Now()
item.UpdatedAt = now item.UpdatedAt = now
isNew := false
if item.ID == "" { if item.ID == "" {
isNew = true
item.CreatedAt = now item.CreatedAt = now
query := ` query := `
INSERT INTO inventory_items INSERT INTO inventory_items
(id, user_id, name, category_id, quantity, unit_id, date_added, status_id, created_at, updated_at) (id, user_id, name, category_id, quantity, unit_id, date_added, status_id, created_at, updated_at)
VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8, $9) VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id` RETURNING id`
return p.conn.QueryRow( err := p.conn.QueryRow(
ctx, ctx,
query, query,
item.UserID, item.UserID, item.Name, item.CategoryID, item.Quantity,
item.Name, item.UnitID, item.DateAdded, item.StatusID, item.CreatedAt, item.UpdatedAt,
item.CategoryID,
item.Quantity,
item.UnitID,
item.DateAdded,
item.StatusID,
item.CreatedAt,
item.UpdatedAt,
).Scan(&item.ID) ).Scan(&item.ID)
if err != nil {
// Log error
return err
}
} else {
query := `
UPDATE inventory_items
SET name = $1, category_id = $2, quantity = $3, unit_id = $4,
date_added = $5, status_id = $6, updated_at = $7
WHERE id = $8 AND user_id = $9
RETURNING id` // Ensure RETURNING id exists or handle differently
err := p.conn.QueryRow(
ctx,
query,
item.Name, item.CategoryID, item.Quantity, item.UnitID,
item.DateAdded, item.StatusID, item.UpdatedAt, item.ID, item.UserID,
).Scan(&item.ID) // Scan to confirm update happened or handle potential ErrNoRows
if err != nil {
// Log error
return err
}
} }
query := ` // --- Publish Event ---
UPDATE inventory_items if p.eventPublisher != nil {
SET name = $1, eventType := "inventory.item.updated"
category_id = $2, if isNew {
quantity = $3, eventType = "inventory.item.created"
unit_id = $4, }
date_added = $5,
status_id = $6,
updated_at = $7
WHERE id = $8 AND user_id = $9
RETURNING id`
return p.conn.QueryRow( payload := map[string]interface{}{
ctx, "item_id": item.ID,
query, "user_id": item.UserID, // Include user ID for potential farm lookup in projection
item.Name, "name": item.Name,
item.CategoryID, "category_id": item.CategoryID,
item.Quantity, "quantity": item.Quantity,
item.UnitID, "unit_id": item.UnitID,
item.DateAdded, "status_id": item.StatusID,
item.StatusID, "date_added": item.DateAdded,
item.UpdatedAt, "updated_at": item.UpdatedAt,
item.ID, // NO farm_id easily available here without extra lookup
item.UserID, }
).Scan(&item.ID)
event := domain.Event{
ID: uuid.NewString(),
Type: eventType,
Source: "inventory-repository",
Timestamp: time.Now().UTC(),
AggregateID: item.UserID, // Use UserID as AggregateID for inventory? Or item.ID? Let's use item.ID.
Payload: payload,
}
// Use AggregateID = item.ID for consistency if item is the aggregate root
event.AggregateID = item.ID
go func() {
bgCtx := context.Background()
if errPub := p.eventPublisher.Publish(bgCtx, event); errPub != nil {
fmt.Printf("Error publishing %s event: %v\n", eventType, errPub) // Use proper logging
}
}()
}
// --- End Publish Event ---
return nil
} }
func (p *postgresInventoryRepository) Delete(ctx context.Context, id, userID string) error { func (p *postgresInventoryRepository) Delete(ctx context.Context, id, userID string) error {
query := `DELETE FROM inventory_items WHERE id = $1 AND user_id = $2` query := `DELETE FROM inventory_items WHERE id = $1 AND user_id = $2`
_, err := p.conn.Exec(ctx, query, id, userID) cmdTag, err := p.conn.Exec(ctx, query, id, userID)
return err if err != nil {
// Log error
return err
}
if cmdTag.RowsAffected() == 0 {
return domain.ErrNotFound // Or a permission error if user doesn't match
}
// --- Publish Event ---
if p.eventPublisher != nil {
eventType := "inventory.item.deleted"
payload := map[string]interface{}{
"item_id": id,
"user_id": userID, // Include user ID
}
event := domain.Event{
ID: uuid.NewString(),
Type: eventType,
Source: "inventory-repository",
Timestamp: time.Now().UTC(),
AggregateID: id, // Use item ID as aggregate ID
Payload: payload,
}
go func() {
bgCtx := context.Background()
if errPub := p.eventPublisher.Publish(bgCtx, event); errPub != nil {
fmt.Printf("Error publishing %s event: %v\n", eventType, errPub) // Use proper logging
}
}()
}
// --- End Publish Event ---
return nil
} }
func (p *postgresInventoryRepository) GetStatuses(ctx context.Context) ([]domain.InventoryStatus, error) { func (p *postgresInventoryRepository) GetStatuses(ctx context.Context) ([]domain.InventoryStatus, error) {