From 1e6c631be30ca5d093b35145cf05fd3ed7238dfa Mon Sep 17 00:00:00 2001 From: Sosokker Date: Fri, 4 Apr 2025 15:30:21 +0700 Subject: [PATCH] fix: fix farm openweather projection --- backend/internal/domain/farm.go | 1 + backend/internal/domain/plant.go | 1 + backend/internal/event/projection.go | 50 ++--- .../internal/repository/postgres_cropland.go | 25 +-- backend/internal/repository/postgres_farm.go | 82 ++++++- .../repository/postgres_farm_analytics.go | 208 +++++++----------- .../internal/repository/postgres_inventory.go | 18 +- backend/internal/repository/postgres_plant.go | 9 + .../internal/services/analytics_service.go | 46 ++-- .../weather/openweathermap_fetcher.go | 159 +++++++------ backend/internal/workers/weather_updater.go | 68 ++++-- 11 files changed, 343 insertions(+), 324 deletions(-) diff --git a/backend/internal/domain/farm.go b/backend/internal/domain/farm.go index c2da327..4b929b6 100644 --- a/backend/internal/domain/farm.go +++ b/backend/internal/domain/farm.go @@ -32,6 +32,7 @@ func (f *Farm) Validate() error { type FarmRepository interface { GetByID(context.Context, string) (*Farm, error) GetByOwnerID(context.Context, string) ([]Farm, error) + GetAll(context.Context) ([]Farm, error) CreateOrUpdate(context.Context, *Farm) error Delete(context.Context, string) error SetEventPublisher(EventPublisher) diff --git a/backend/internal/domain/plant.go b/backend/internal/domain/plant.go index 289bca7..3a70696 100644 --- a/backend/internal/domain/plant.go +++ b/backend/internal/domain/plant.go @@ -45,6 +45,7 @@ func (p *Plant) Validate() error { type PlantRepository interface { GetByUUID(context.Context, string) (Plant, error) GetAll(context.Context) ([]Plant, error) + GetByName(context.Context, string) (Plant, error) Create(context.Context, *Plant) error Update(context.Context, *Plant) error Delete(context.Context, string) error diff --git a/backend/internal/event/projection.go b/backend/internal/event/projection.go index a5d57f1..1834d0b 100644 --- a/backend/internal/event/projection.go +++ b/backend/internal/event/projection.go @@ -1,4 +1,3 @@ -// backend/internal/event/projection.go package event import ( @@ -35,11 +34,10 @@ func NewFarmAnalyticsProjection( func (p *FarmAnalyticsProjection) Start(ctx context.Context) error { eventTypes := []string{ - "farm.created", "farm.updated", "farm.deleted", // Farm lifecycle - "weather.updated", // Weather updates - "cropland.created", "cropland.updated", "cropland.deleted", // Crop changes trigger count recalc - "inventory.item.created", "inventory.item.updated", "inventory.item.deleted", // Inventory changes trigger timestamp update - // Add other events that might influence FarmAnalytics, e.g., "pest.detected", "yield.recorded" + "farm.created", "farm.updated", "farm.deleted", + "weather.updated", + "cropland.created", "cropland.updated", "cropland.deleted", + "inventory.item.created", "inventory.item.updated", "inventory.item.deleted", } p.logger.Info("FarmAnalyticsProjection starting, subscribing to events", "types", eventTypes) @@ -49,8 +47,6 @@ func (p *FarmAnalyticsProjection) Start(ctx context.Context) error { if err := p.eventSubscriber.Subscribe(ctx, eventType, p.handleEvent); err != nil { p.logger.Error("Failed to subscribe to event type", "type", eventType, "error", err) errs = append(errs, fmt.Errorf("failed to subscribe to %s: %w", eventType, err)) - // TODO: Decide if we should continue subscribing or fail hard - // return errors.Join(errs...) // Fail hard } else { p.logger.Info("Successfully subscribed to event type", "type", eventType) } @@ -65,33 +61,30 @@ func (p *FarmAnalyticsProjection) Start(ctx context.Context) error { } func (p *FarmAnalyticsProjection) handleEvent(event domain.Event) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // 10-second timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() p.logger.Debug("Handling event in FarmAnalyticsProjection", "type", event.Type, "aggregate_id", event.AggregateID, "event_id", event.ID) - farmID := event.AggregateID // Assume AggregateID is the Farm UUID for relevant events + farmID := event.AggregateID - // Special case: inventory events might use UserID as AggregateID. - // Need a way to map UserID to FarmID if necessary, or adjust event publishing. - // For now, we assume farmID can be derived or is directly in the payload for inventory events. - - if farmID == "" { + // Try to get farmID from payload if AggregateID is empty or potentially not the farmID (e.g., user events) + if farmID == "" || event.Type == "inventory.item.created" || event.Type == "inventory.item.updated" || event.Type == "inventory.item.deleted" || event.Type == "cropland.created" || event.Type == "cropland.updated" || event.Type == "cropland.deleted" { payloadMap, ok := event.Payload.(map[string]interface{}) if ok { if idVal, ok := payloadMap["farm_id"].(string); ok && idVal != "" { farmID = idVal - } else if idVal, ok := payloadMap["user_id"].(string); ok && idVal != "" { - // !! WARNING: Need mapping from user_id to farm_id here !! - // This is a temp - requires adding userRepo or similar lookup - p.logger.Warn("Inventory event received without direct farm_id, cannot update stats", "event_id", event.ID, "user_id", idVal) - // Skip inventory stats update if farm_id is missing + } else if event.Type != "farm.deleted" && event.Type != "farm.created" { + p.logger.Warn("Could not determine farm_id from event payload or AggregateID", "event_type", event.Type, "event_id", event.ID, "aggregate_id", event.AggregateID) return nil } + } else if event.Type != "farm.deleted" && event.Type != "farm.created" { + p.logger.Error("Event payload is not a map, cannot extract farm_id", "event_type", event.Type, "event_id", event.ID) + return nil } } - if farmID == "" && event.Type != "farm.deleted" { // farm.deleted uses AggregateID which is the farmID being deleted + if farmID == "" && event.Type != "farm.deleted" { p.logger.Error("Cannot process event, missing farm_id", "event_type", event.Type, "event_id", event.ID, "aggregate_id", event.AggregateID) return nil } @@ -99,22 +92,21 @@ func (p *FarmAnalyticsProjection) handleEvent(event domain.Event) error { var err error switch event.Type { case "farm.created", "farm.updated": - // Need to get the full Farm domain object from the payload var farmData domain.Farm - jsonData, _ := json.Marshal(event.Payload) // Convert payload map back to JSON + jsonData, _ := json.Marshal(event.Payload) if err = json.Unmarshal(jsonData, &farmData); err != nil { p.logger.Error("Failed to unmarshal farm data from event payload", "event_id", event.ID, "error", err) - // Nack or Ack based on error strategy? Ack for now. return nil } - // Ensure UUID is set from AggregateID if missing in payload itself if farmData.UUID == "" { farmData.UUID = event.AggregateID } + + p.logger.Info("Processing farm event", "event_type", event.Type, "farm_id", farmData.UUID, "owner_id", farmData.OwnerID) err = p.repository.CreateOrUpdateFarmBaseData(ctx, &farmData) case "farm.deleted": - farmID = event.AggregateID // Use AggregateID directly for delete + farmID = event.AggregateID if farmID == "" { p.logger.Error("Cannot process farm.deleted event, missing farm_id in AggregateID", "event_id", event.ID) return nil @@ -122,12 +114,11 @@ func (p *FarmAnalyticsProjection) handleEvent(event domain.Event) error { err = p.repository.DeleteFarmAnalytics(ctx, farmID) case "weather.updated": - // Extract weather data from payload var weatherData domain.WeatherData jsonData, _ := json.Marshal(event.Payload) if err = json.Unmarshal(jsonData, &weatherData); err != nil { p.logger.Error("Failed to unmarshal weather data from event payload", "event_id", event.ID, "error", err) - return nil // Acknowledge bad data + return nil } err = p.repository.UpdateFarmAnalyticsWeather(ctx, farmID, &weatherData) @@ -146,8 +137,6 @@ func (p *FarmAnalyticsProjection) handleEvent(event domain.Event) error { err = p.repository.UpdateFarmAnalyticsCropStats(ctx, farmID) case "inventory.item.created", "inventory.item.updated", "inventory.item.deleted": - // farmID needs to be looked up or present in payload - // For now, we only touch the timestamp if farmID != "" { err = p.repository.UpdateFarmAnalyticsInventoryStats(ctx, farmID) } else { @@ -162,7 +151,6 @@ func (p *FarmAnalyticsProjection) handleEvent(event domain.Event) error { if err != nil { p.logger.Error("Failed to update farm analytics", "event_type", event.Type, "farm_id", farmID, "error", err) - // Decide whether to return the error (potentially causing requeue) or nil (ack) return nil } diff --git a/backend/internal/repository/postgres_cropland.go b/backend/internal/repository/postgres_cropland.go index 7f6e3a7..5c6e55d 100644 --- a/backend/internal/repository/postgres_cropland.go +++ b/backend/internal/repository/postgres_cropland.go @@ -124,20 +124,19 @@ func (p *postgresCroplandRepository) CreateOrUpdate(ctx context.Context, c *doma if c.GeoFeature != nil { _ = json.Unmarshal(c.GeoFeature, &geoFeatureMap) } - payload := map[string]interface{}{ - "crop_id": c.UUID, - "name": c.Name, - "status": c.Status, - "priority": c.Priority, - "land_size": c.LandSize, - "growth_stage": c.GrowthStage, - "plant_id": c.PlantID, - "farm_id": c.FarmID, - "geo_feature": geoFeatureMap, - "created_at": c.CreatedAt, - "updated_at": c.UpdatedAt, - "event_type": eventType, + "uuid": c.UUID, + "name": c.Name, + "status": c.Status, + "priority": c.Priority, + "landSize": c.LandSize, + "growthStage": c.GrowthStage, + "plantId": c.PlantID, + "farmId": c.FarmID, + "geoFeature": geoFeatureMap, + "createdAt": c.CreatedAt, + "updatedAt": c.UpdatedAt, + "event_type": eventType, } event := domain.Event{ diff --git a/backend/internal/repository/postgres_farm.go b/backend/internal/repository/postgres_farm.go index ba72476..7f05dc5 100644 --- a/backend/internal/repository/postgres_farm.go +++ b/backend/internal/repository/postgres_farm.go @@ -2,11 +2,13 @@ package repository import ( "context" + "errors" "strings" "time" "github.com/forfarm/backend/internal/domain" "github.com/google/uuid" + "github.com/jackc/pgx/v5" ) type postgresFarmRepository struct { @@ -94,11 +96,52 @@ func (p *postgresFarmRepository) fetchCroplandsByFarmIDs(ctx context.Context, fa return croplandsByFarmID, nil } +func (p *postgresFarmRepository) GetAll(ctx context.Context) ([]domain.Farm, error) { + // Query to select all farms, ordered by creation date for consistency + query := ` + SELECT uuid, name, lat, lon, farm_type, total_size, created_at, updated_at, owner_id + FROM farms + ORDER BY created_at DESC` + + // Use the existing fetch method without specific arguments for filtering + farms, err := p.fetch(ctx, query) + if err != nil { + return nil, err + } + if len(farms) == 0 { + return []domain.Farm{}, nil // Return empty slice, not nil + } + + // --- Fetch associated crops (optional but good for consistency) --- + farmIDs := make([]string, 0, len(farms)) + farmMap := make(map[string]*domain.Farm, len(farms)) + for i := range farms { + farmIDs = append(farmIDs, farms[i].UUID) + farmMap[farms[i].UUID] = &farms[i] + } + + croplandsByFarmID, err := p.fetchCroplandsByFarmIDs(ctx, farmIDs) + if err != nil { + // Log the warning but return the farms fetched so far + // Depending on requirements, you might want to return the error instead + println("Warning: Failed to fetch associated croplands during GetAll:", err.Error()) + } else { + for farmID, croplands := range croplandsByFarmID { + if farm, ok := farmMap[farmID]; ok { + farm.Crops = croplands + } + } + } + // --- End Fetch associated crops --- + + return farms, nil +} + func (p *postgresFarmRepository) GetByID(ctx context.Context, farmId string) (*domain.Farm, error) { query := ` - SELECT uuid, name, lat, lon, farm_type, total_size, created_at, updated_at, owner_id - FROM farms - WHERE uuid = $1` + SELECT uuid, name, lat, lon, farm_type, total_size, created_at, updated_at, owner_id + FROM farms + WHERE uuid = $1` var f domain.Farm err := p.conn.QueryRow(ctx, query, farmId).Scan( &f.UUID, @@ -112,8 +155,21 @@ func (p *postgresFarmRepository) GetByID(ctx context.Context, farmId string) (*d &f.OwnerID, ) if err != nil { - return nil, err + if errors.Is(err, pgx.ErrNoRows) { // Check for pgx specific error + return nil, domain.ErrNotFound + } + return nil, err // Return other errors } + + // Fetch associated crops (optional, depends if GetByID needs them) + cropsMap, err := p.fetchCroplandsByFarmIDs(ctx, []string{f.UUID}) + if err != nil { + println("Warning: Failed to fetch croplands for GetByID:", err.Error()) + // Decide whether to return the farm without crops or return the error + } else if crops, ok := cropsMap[f.UUID]; ok { + f.Crops = crops + } + return &f, nil } @@ -192,14 +248,16 @@ func (p *postgresFarmRepository) CreateOrUpdate(ctx context.Context, f *domain.F Timestamp: time.Now(), AggregateID: f.UUID, Payload: map[string]interface{}{ - "farm_id": f.UUID, - "name": f.Name, - "location": map[string]float64{"lat": f.Lat, "lon": f.Lon}, - "farm_type": f.FarmType, - "total_size": f.TotalSize, - "owner_id": f.OwnerID, - "created_at": f.CreatedAt, - "updated_at": f.UpdatedAt, + "uuid": f.UUID, + "name": f.Name, + "lat": f.Lat, + "lon": f.Lon, + "location": map[string]float64{"lat": f.Lat, "lon": f.Lon}, + "farmType": f.FarmType, + "totalSize": f.TotalSize, + "ownerId": f.OwnerID, + "createdAt": f.CreatedAt, + "updatedAt": f.UpdatedAt, }, } diff --git a/backend/internal/repository/postgres_farm_analytics.go b/backend/internal/repository/postgres_farm_analytics.go index 4b234e5..977162b 100644 --- a/backend/internal/repository/postgres_farm_analytics.go +++ b/backend/internal/repository/postgres_farm_analytics.go @@ -59,7 +59,7 @@ func (r *postgresFarmAnalyticsRepository) GetFarmAnalytics(ctx context.Context, &analytics.OwnerID, &farmType, &totalSize, - &analytics.Latitude, // Scan directly into the struct fields + &analytics.Latitude, &analytics.Longitude, &weatherJSON, &inventoryJSON, @@ -228,16 +228,16 @@ func (r *postgresFarmAnalyticsRepository) GetCropAnalytics(ctx context.Context, func (r *postgresFarmAnalyticsRepository) CreateOrUpdateFarmBaseData(ctx context.Context, farm *domain.Farm) error { query := ` - INSERT INTO farm_analytics (farm_id, farm_name, owner_id, farm_type, total_size, lat, lon, last_updated) + INSERT INTO farm_analytics (farm_id, farm_name, owner_id, farm_type, total_size, latitude, longitude, analytics_last_updated) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (farm_id) DO UPDATE SET farm_name = EXCLUDED.farm_name, owner_id = EXCLUDED.owner_id, farm_type = EXCLUDED.farm_type, total_size = EXCLUDED.total_size, - lat = EXCLUDED.lat, - lon = EXCLUDED.lon, - last_updated = EXCLUDED.last_updated;` + latitude = EXCLUDED.latitude, + longitude = EXCLUDED.longitude, + analytics_last_updated = EXCLUDED.analytics_last_updated;` _, err := r.conn.Exec(ctx, query, farm.UUID, @@ -259,158 +259,100 @@ func (r *postgresFarmAnalyticsRepository) CreateOrUpdateFarmBaseData(ctx context func (r *postgresFarmAnalyticsRepository) UpdateFarmAnalyticsWeather(ctx context.Context, farmID string, weatherData *domain.WeatherData) error { if weatherData == nil { - return fmt.Errorf("weather data cannot be nil") + return errors.New("weather data cannot be nil for update") } - - weatherJSON, err := json.Marshal(weatherData) - if err != nil { - r.logger.Error("Failed to marshal weather data for analytics update", "farm_id", farmID, "error", err) - return fmt.Errorf("failed to marshal weather data: %w", err) - } - query := ` - UPDATE farm_analytics - SET weather_data = $1, - last_updated = $2 - WHERE farm_id = $3;` + UPDATE public.farm_analytics SET + weather_temp_celsius = $2, + weather_humidity = $3, + weather_description = $4, + weather_icon = $5, + weather_wind_speed = $6, + weather_rain_1h = $7, + weather_observed_at = $8, + weather_last_updated = NOW(), -- Use current time for the update time + analytics_last_updated = NOW() + WHERE farm_id = $1` - cmdTag, err := r.conn.Exec(ctx, query, weatherJSON, time.Now().UTC(), farmID) + _, err := r.conn.Exec(ctx, query, + farmID, + weatherData.TempCelsius, + weatherData.Humidity, + weatherData.Description, + weatherData.Icon, + weatherData.WindSpeed, + weatherData.RainVolume1h, + weatherData.ObservedAt, + ) if err != nil { - r.logger.Error("Failed to update farm analytics weather data", "farm_id", farmID, "error", err) - return fmt.Errorf("database update failed for weather data: %w", err) + r.logger.Error("Error updating farm weather analytics", "farm_id", farmID, "error", err) + return fmt.Errorf("failed to update weather analytics for farm %s: %w", farmID, err) } - if cmdTag.RowsAffected() == 0 { - r.logger.Warn("No farm analytics record found to update weather data", "farm_id", farmID) - // Optionally, create the base record here if it should always exist - return domain.ErrNotFound // Or handle as appropriate - } - - r.logger.Debug("Updated farm analytics weather data", "farm_id", farmID) + r.logger.Debug("Updated farm weather analytics", "farm_id", farmID) return nil } // UpdateFarmAnalyticsCropStats needs to query the croplands table for the farm func (r *postgresFarmAnalyticsRepository) UpdateFarmAnalyticsCropStats(ctx context.Context, farmID string) error { + countQuery := ` + SELECT + COUNT(*), + COUNT(*) FILTER (WHERE lower(status) = 'growing') + FROM public.croplands + WHERE farm_id = $1 + ` var totalCount, growingCount int - - // Query to count total and growing crops for the farm - query := ` - SELECT - COUNT(*), - COUNT(*) FILTER (WHERE status = 'growing') -- Case-insensitive comparison if needed: LOWER(status) = 'growing' - FROM croplands - WHERE farm_id = $1;` - - err := r.conn.QueryRow(ctx, query, farmID).Scan(&totalCount, &growingCount) + err := r.conn.QueryRow(ctx, countQuery, farmID).Scan(&totalCount, &growingCount) if err != nil { - // Log error but don't fail the projection if stats can't be calculated temporarily - r.logger.Error("Failed to calculate crop stats for analytics", "farm_id", farmID, "error", err) - return fmt.Errorf("failed to calculate crop stats: %w", err) + if !errors.Is(err, pgx.ErrNoRows) { + r.logger.Error("Error calculating crop counts", "farm_id", farmID, "error", err) + return fmt.Errorf("failed to calculate crop stats for farm %s: %w", farmID, err) + } } - // Construct the JSONB object for crop_data - cropInfo := map[string]interface{}{ - "totalCount": totalCount, - "growingCount": growingCount, - "lastUpdated": time.Now().UTC(), // Timestamp of this calculation - } - cropJSON, err := json.Marshal(cropInfo) - if err != nil { - r.logger.Error("Failed to marshal crop stats data", "farm_id", farmID, "error", err) - return fmt.Errorf("failed to marshal crop stats: %w", err) - } - - // Update the farm_analytics table updateQuery := ` - UPDATE farm_analytics - SET crop_data = $1, - last_updated = $2 -- Also update the main last_updated timestamp - WHERE farm_id = $3;` + UPDATE public.farm_analytics SET + crop_total_count = $2, + crop_growing_count = $3, + crop_last_updated = NOW(), + analytics_last_updated = NOW() + WHERE farm_id = $1` - cmdTag, err := r.conn.Exec(ctx, updateQuery, cropJSON, time.Now().UTC(), farmID) + cmdTag, err := r.conn.Exec(ctx, updateQuery, farmID, totalCount, growingCount) if err != nil { - r.logger.Error("Failed to update farm analytics crop stats", "farm_id", farmID, "error", err) - return fmt.Errorf("database update failed for crop stats: %w", err) + r.logger.Error("Error updating farm crop stats", "farm_id", farmID, "error", err) + return fmt.Errorf("failed to update crop stats for farm %s: %w", farmID, err) } if cmdTag.RowsAffected() == 0 { r.logger.Warn("No farm analytics record found to update crop stats", "farm_id", farmID) - // Optionally, create the base record here - } else { - r.logger.Debug("Updated farm analytics crop stats", "farm_id", farmID, "total", totalCount, "growing", growingCount) + // Optionally, create the base record here if it should always exist + return r.CreateOrUpdateFarmBaseData(ctx, &domain.Farm{UUID: farmID /* Fetch other details */}) } + + r.logger.Debug("Updated farm crop stats", "farm_id", farmID, "total", totalCount, "growing", growingCount) return nil } // UpdateFarmAnalyticsInventoryStats needs to query inventory_items func (r *postgresFarmAnalyticsRepository) UpdateFarmAnalyticsInventoryStats(ctx context.Context, farmID string) error { - var totalItems, lowStockCount int - var lastUpdated sql.NullTime - - // Query to get inventory stats for the user owning the farm - // NOTE: This assumes inventory is linked by user_id, and we need the user_id for the farm owner. - // Step 1: Get Owner ID from farm_analytics table - var ownerID string - ownerQuery := `SELECT owner_id FROM farm_analytics WHERE farm_id = $1` - err := r.conn.QueryRow(ctx, ownerQuery, farmID).Scan(&ownerID) - if err != nil { - if errors.Is(err, sql.ErrNoRows) || errors.Is(err, pgx.ErrNoRows) { - r.logger.Warn("Cannot update inventory stats, farm analytics record not found", "farm_id", farmID) - return nil // Or return ErrNotFound if critical - } - r.logger.Error("Failed to get owner ID for inventory stats update", "farm_id", farmID, "error", err) - return fmt.Errorf("failed to get owner ID: %w", err) - } - - // Step 2: Query inventory based on owner ID query := ` - SELECT - COUNT(*), - COUNT(*) FILTER (WHERE status_id = (SELECT id FROM inventory_status WHERE name = 'Low Stock')), -- Assumes 'Low Stock' status name - MAX(updated_at) -- Get the latest update timestamp from inventory items - FROM inventory_items - WHERE user_id = $1;` + UPDATE public.farm_analytics SET + -- inventory_total_items = (SELECT COUNT(*) FROM ... WHERE farm_id = $1), -- Example future logic + -- inventory_low_stock_count = (SELECT COUNT(*) FROM ... WHERE farm_id = $1 AND status = 'low'), -- Example + inventory_last_updated = NOW(), + analytics_last_updated = NOW() + WHERE farm_id = $1` - err = r.conn.QueryRow(ctx, query, ownerID).Scan(&totalItems, &lowStockCount, &lastUpdated) + cmdTag, err := r.conn.Exec(ctx, query, farmID) if err != nil { - // Log error but don't fail the projection if stats can't be calculated temporarily - r.logger.Error("Failed to calculate inventory stats for analytics", "farm_id", farmID, "owner_id", ownerID, "error", err) - return fmt.Errorf("failed to calculate inventory stats: %w", err) - } - - // Construct the JSONB object for inventory_data - inventoryInfo := map[string]interface{}{ - "totalItems": totalItems, - "lowStockCount": lowStockCount, - "lastUpdated": nil, // Initialize as nil - } - // Only set lastUpdated if the MAX(updated_at) query returned a valid time - if lastUpdated.Valid { - inventoryInfo["lastUpdated"] = lastUpdated.Time.UTC() - } - - inventoryJSON, err := json.Marshal(inventoryInfo) - if err != nil { - r.logger.Error("Failed to marshal inventory stats data", "farm_id", farmID, "error", err) - return fmt.Errorf("failed to marshal inventory stats: %w", err) - } - - // Update the farm_analytics table - updateQuery := ` - UPDATE farm_analytics - SET inventory_data = $1, - last_updated = $2 -- Also update the main last_updated timestamp - WHERE farm_id = $3;` - - cmdTag, err := r.conn.Exec(ctx, updateQuery, inventoryJSON, time.Now().UTC(), farmID) - if err != nil { - r.logger.Error("Failed to update farm analytics inventory stats", "farm_id", farmID, "error", err) - return fmt.Errorf("database update failed for inventory stats: %w", err) + r.logger.Error("Error touching inventory timestamp in farm analytics", "farm_id", farmID, "error", err) + return fmt.Errorf("failed to update inventory stats timestamp for farm %s: %w", farmID, err) } if cmdTag.RowsAffected() == 0 { - r.logger.Warn("No farm analytics record found to update inventory stats", "farm_id", farmID) - } else { - r.logger.Debug("Updated farm analytics inventory stats", "farm_id", farmID, "total", totalItems, "lowStock", lowStockCount) + r.logger.Warn("No farm analytics record found to update inventory timestamp", "farm_id", farmID) } + + r.logger.Debug("Updated farm inventory timestamp", "farm_id", farmID) return nil } @@ -432,20 +374,18 @@ func (r *postgresFarmAnalyticsRepository) DeleteFarmAnalytics(ctx context.Contex func (r *postgresFarmAnalyticsRepository) UpdateFarmOverallStatus(ctx context.Context, farmID string, status string) error { query := ` - UPDATE farm_analytics - SET overall_status = $1, - last_updated = $2 - WHERE farm_id = $3;` + UPDATE public.farm_analytics SET + overall_status = $2, + analytics_last_updated = NOW() + WHERE farm_id = $1` - cmdTag, err := r.conn.Exec(ctx, query, status, time.Now().UTC(), farmID) + cmdTag, err := r.conn.Exec(ctx, query, farmID, status) if err != nil { - r.logger.Error("Failed to update farm overall status", "farm_id", farmID, "status", status, "error", err) - return fmt.Errorf("database update failed for overall status: %w", err) + r.logger.Error("Error updating farm overall status", "farm_id", farmID, "status", status, "error", err) + return fmt.Errorf("failed to update overall status for farm %s: %w", farmID, err) } if cmdTag.RowsAffected() == 0 { r.logger.Warn("No farm analytics record found to update overall status", "farm_id", farmID) - // Optionally, create the base record here if needed - return domain.ErrNotFound } r.logger.Debug("Updated farm overall status", "farm_id", farmID, "status", status) return nil diff --git a/backend/internal/repository/postgres_inventory.go b/backend/internal/repository/postgres_inventory.go index c0e1287..f5ef4bf 100644 --- a/backend/internal/repository/postgres_inventory.go +++ b/backend/internal/repository/postgres_inventory.go @@ -268,15 +268,15 @@ func (p *postgresInventoryRepository) CreateOrUpdate(ctx context.Context, item * } payload := map[string]interface{}{ - "item_id": item.ID, - "user_id": item.UserID, // Include user ID for potential farm lookup in projection - "name": item.Name, - "category_id": item.CategoryID, - "quantity": item.Quantity, - "unit_id": item.UnitID, - "status_id": item.StatusID, - "date_added": item.DateAdded, - "updated_at": item.UpdatedAt, + "id": item.ID, + "userId": item.UserID, // Include user ID for potential farm lookup in projection + "name": item.Name, + "categoryId": item.CategoryID, + "quantity": item.Quantity, + "unitId": item.UnitID, + "statusId": item.StatusID, + "dateAdded": item.DateAdded, + "updatedAt": item.UpdatedAt, // NO farm_id easily available here without extra lookup } diff --git a/backend/internal/repository/postgres_plant.go b/backend/internal/repository/postgres_plant.go index 963c31a..bb4eb38 100644 --- a/backend/internal/repository/postgres_plant.go +++ b/backend/internal/repository/postgres_plant.go @@ -51,6 +51,15 @@ func (p *postgresPlantRepository) GetByUUID(ctx context.Context, uuid string) (d return plants[0], nil } +func (p *postgresPlantRepository) GetByName(ctx context.Context, name string) (domain.Plant, error) { + query := `SELECT * FROM plants WHERE name = $1` + plants, err := p.fetch(ctx, query, name) + if err != nil || len(plants) == 0 { + return domain.Plant{}, domain.ErrNotFound + } + return plants[0], nil +} + func (p *postgresPlantRepository) GetAll(ctx context.Context) ([]domain.Plant, error) { query := `SELECT * FROM plants` return p.fetch(ctx, query) diff --git a/backend/internal/services/analytics_service.go b/backend/internal/services/analytics_service.go index d412dc8..fa79dfa 100644 --- a/backend/internal/services/analytics_service.go +++ b/backend/internal/services/analytics_service.go @@ -7,41 +7,31 @@ import ( "github.com/forfarm/backend/internal/domain" ) -// AnalyticsService provides methods for calculating or deriving analytics data. -// For now, it contains dummy implementations. type AnalyticsService struct { - // Add dependencies like repositories if needed for real logic later } -// NewAnalyticsService creates a new AnalyticsService. func NewAnalyticsService() *AnalyticsService { return &AnalyticsService{} } -// CalculatePlantHealth provides a dummy health status. -// TODO: Implement real health calculation based on status, weather, events, etc. func (s *AnalyticsService) CalculatePlantHealth(status string, growthStage string) string { - // Simple dummy logic switch status { case "Problem", "Diseased", "Infested": return "warning" case "Fallow", "Harvested": - return "n/a" // Or maybe 'good' if fallow is considered healthy state + return "n/a" default: - // Slightly randomize for demo purposes - if rand.Intn(10) < 2 { // 20% chance of warning even if status is 'growing' + // 20% chance of warning even if status is 'growing' + if rand.Intn(10) < 2 { return "warning" } return "good" } } -// SuggestNextAction provides a dummy next action based on growth stage. -// TODO: Implement real suggestion logic based on stage, weather, history, plant type etc. func (s *AnalyticsService) SuggestNextAction(growthStage string, lastUpdated time.Time) (action *string, dueDate *time.Time) { - // Default action nextActionStr := "Monitor crop health" - nextDueDate := time.Now().Add(24 * time.Hour) // Check tomorrow + nextDueDate := time.Now().Add(24 * time.Hour) switch growthStage { case "Planned", "Planting": @@ -58,30 +48,27 @@ func (s *AnalyticsService) SuggestNextAction(growthStage string, lastUpdated tim nextDueDate = time.Now().Add(48 * time.Hour) case "Fruiting", "Ripening": nextActionStr = "Monitor fruit development and prepare for harvest" - nextDueDate = time.Now().Add(7 * 24 * time.Hour) // Check in a week + nextDueDate = time.Now().Add(7 * 24 * time.Hour) case "Harvesting": nextActionStr = "Proceed with harvest" nextDueDate = time.Now().Add(24 * time.Hour) } - // Only return if the suggestion is "newer" than the last update to avoid constant same suggestion - // This is basic logic, real implementation would be more complex - if nextDueDate.After(lastUpdated.Add(1 * time.Hour)) { // Only suggest if due date is >1hr after last update + // Only suggest if due date is >1hr after last update + if nextDueDate.After(lastUpdated.Add(1 * time.Hour)) { return &nextActionStr, &nextDueDate } - return nil, nil // No immediate action needed or suggestion is old + return nil, nil } -// GetNutrientLevels provides dummy nutrient levels. -// TODO: Implement real nutrient level fetching (e.g., from soil sensors, lab results events). func (s *AnalyticsService) GetNutrientLevels(cropID string) *struct { Nitrogen *float64 `json:"nitrogen,omitempty"` Phosphorus *float64 `json:"phosphorus,omitempty"` Potassium *float64 `json:"potassium,omitempty"` } { - // Return dummy data or nil if unavailable - if rand.Intn(10) < 7 { // 70% chance of having dummy data + // 70% chance of having dummy data + if rand.Intn(10) < 7 { n := float64(50 + rand.Intn(40)) // 50-89 p := float64(40 + rand.Intn(40)) // 40-79 k := float64(45 + rand.Intn(40)) // 45-84 @@ -95,26 +82,20 @@ func (s *AnalyticsService) GetNutrientLevels(cropID string) *struct { Potassium: &k, } } - return nil // Simulate data not available + return nil } -// GetEnvironmentalData attempts to retrieve relevant environmental data. -// TODO: Enhance this - Could query specific weather events for the crop location/timeframe. -// Currently relies on potentially stale FarmAnalytics weather. func (s *AnalyticsService) GetEnvironmentalData(farmAnalytics *domain.FarmAnalytics) (temp, humidity, wind, rain, sunlight, soilMoisture *float64) { - // Initialize with nil temp, humidity, wind, rain, sunlight, soilMoisture = nil, nil, nil, nil, nil, nil - // Try to get from FarmAnalytics if farmAnalytics != nil && farmAnalytics.Weather != nil { temp = farmAnalytics.Weather.TempCelsius humidity = farmAnalytics.Weather.Humidity wind = farmAnalytics.Weather.WindSpeed rain = farmAnalytics.Weather.RainVolume1h - // Note: Sunlight and SoilMoisture are not typically in basic WeatherData } - // Provide dummy values ONLY if still nil (ensures real data isn't overwritten) + // Provide dummy values only if data is missing if temp == nil { t := float64(18 + rand.Intn(15)) // 18-32 C temp = &t @@ -128,7 +109,6 @@ func (s *AnalyticsService) GetEnvironmentalData(farmAnalytics *domain.FarmAnalyt wind = &w } if rain == nil { - // Simulate less frequent rain r := 0.0 if rand.Intn(10) < 2 { // 20% chance of rain r = float64(rand.Intn(5)) // 0-4 mm @@ -144,5 +124,5 @@ func (s *AnalyticsService) GetEnvironmentalData(farmAnalytics *domain.FarmAnalyt soilMoisture = &sm } - return // Named return values + return } diff --git a/backend/internal/services/weather/openweathermap_fetcher.go b/backend/internal/services/weather/openweathermap_fetcher.go index 6b5b676..6297564 100644 --- a/backend/internal/services/weather/openweathermap_fetcher.go +++ b/backend/internal/services/weather/openweathermap_fetcher.go @@ -1,4 +1,3 @@ -// backend/internal/services/weather/openweathermap_fetcher.go package weather import ( @@ -14,45 +13,57 @@ import ( "github.com/forfarm/backend/internal/domain" ) -const openWeatherMapOneCallAPIURL = "https://api.openweathermap.org/data/3.0/onecall" +const openWeatherMapCurrentAPIURL = "https://api.openweathermap.org/data/2.5/weather" -type openWeatherMapOneCallResponse struct { - Lat float64 `json:"lat"` - Lon float64 `json:"lon"` - Timezone string `json:"timezone"` - TimezoneOffset int `json:"timezone_offset"` - Current *struct { - Dt int64 `json:"dt"` // Current time, Unix, UTC - Sunrise int64 `json:"sunrise"` - Sunset int64 `json:"sunset"` - Temp float64 `json:"temp"` // Kelvin by default, 'units=metric' for Celsius - FeelsLike float64 `json:"feels_like"` // Kelvin by default - Pressure int `json:"pressure"` // hPa - Humidity int `json:"humidity"` // % - DewPoint float64 `json:"dew_point"` - Uvi float64 `json:"uvi"` - Clouds int `json:"clouds"` // % - Visibility int `json:"visibility"` // meters - WindSpeed float64 `json:"wind_speed"` // meter/sec by default - WindDeg int `json:"wind_deg"` - WindGust float64 `json:"wind_gust,omitempty"` - Rain *struct { - OneH float64 `json:"1h"` // Rain volume for the last 1 hour, mm - } `json:"rain,omitempty"` - Snow *struct { - OneH float64 `json:"1h"` // Snow volume for the last 1 hour, mm - } `json:"snow,omitempty"` - Weather []struct { - ID int `json:"id"` - Main string `json:"main"` - Description string `json:"description"` - Icon string `json:"icon"` - } `json:"weather"` - } `json:"current,omitempty"` - // Minutely []... - // Hourly []... - // Daily []... - // Alerts []... +type openWeatherMapCurrentResponse struct { + Coord struct { + Lon float64 `json:"lon"` + Lat float64 `json:"lat"` + } `json:"coord"` + Weather []struct { + ID int `json:"id"` + Main string `json:"main"` + Description string `json:"description"` + Icon string `json:"icon"` + } `json:"weather"` + Base string `json:"base"` + Main *struct { + Temp float64 `json:"temp"` + FeelsLike float64 `json:"feels_like"` + TempMin float64 `json:"temp_min"` + TempMax float64 `json:"temp_max"` + Pressure int `json:"pressure"` + Humidity int `json:"humidity"` + SeaLevel int `json:"sea_level,omitempty"` + GrndLevel int `json:"grnd_level,omitempty"` + } `json:"main"` + Visibility int `json:"visibility"` + Wind *struct { + Speed float64 `json:"speed"` + Deg int `json:"deg"` + Gust float64 `json:"gust,omitempty"` + } `json:"wind"` + Rain *struct { + OneH float64 `json:"1h"` + } `json:"rain,omitempty"` + Snow *struct { + OneH float64 `json:"1h"` + } `json:"snow,omitempty"` + Clouds *struct { + All int `json:"all"` + } `json:"clouds"` + Dt int64 `json:"dt"` + Sys *struct { + Type int `json:"type,omitempty"` + ID int `json:"id,omitempty"` + Country string `json:"country"` + Sunrise int64 `json:"sunrise"` + Sunset int64 `json:"sunset"` + } `json:"sys"` + Timezone int `json:"timezone"` + ID int `json:"id"` + Name string `json:"name"` + Cod int `json:"cod"` } type OpenWeatherMapFetcher struct { @@ -80,11 +91,10 @@ func (f *OpenWeatherMapFetcher) GetCurrentWeatherByCoords(ctx context.Context, l queryParams.Set("lat", fmt.Sprintf("%.4f", lat)) queryParams.Set("lon", fmt.Sprintf("%.4f", lon)) queryParams.Set("appid", f.apiKey) - queryParams.Set("units", "metric") // Request Celsius and m/s - queryParams.Set("exclude", "minutely,hourly,daily,alerts") // Exclude parts we don't need now + queryParams.Set("units", "metric") - fullURL := fmt.Sprintf("%s?%s", openWeatherMapOneCallAPIURL, queryParams.Encode()) - f.logger.Debug("Fetching weather from OpenWeatherMap OneCall API", "url", fullURL) + fullURL := fmt.Sprintf("%s?%s", openWeatherMapCurrentAPIURL, queryParams.Encode()) + f.logger.Debug("Fetching weather from OpenWeatherMap Current API", "url", fullURL) req, err := http.NewRequestWithContext(ctx, http.MethodGet, fullURL, nil) if err != nil { @@ -100,7 +110,6 @@ func (f *OpenWeatherMapFetcher) GetCurrentWeatherByCoords(ctx context.Context, l defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - // TODO: Read resp.Body to get error message from OpenWeatherMap bodyBytes, _ := io.ReadAll(resp.Body) f.logger.Error("OpenWeatherMap API returned non-OK status", "url", fullURL, @@ -109,46 +118,60 @@ func (f *OpenWeatherMapFetcher) GetCurrentWeatherByCoords(ctx context.Context, l return nil, fmt.Errorf("weather API request failed with status: %s", resp.Status) } - var owmResp openWeatherMapOneCallResponse + var owmResp openWeatherMapCurrentResponse if err := json.NewDecoder(resp.Body).Decode(&owmResp); err != nil { - f.logger.Error("Failed to decode OpenWeatherMap OneCall response", "error", err) + f.logger.Error("Failed to decode OpenWeatherMap Current response", "error", err) return nil, fmt.Errorf("failed to decode weather response: %w", err) } - if owmResp.Current == nil { - f.logger.Warn("OpenWeatherMap OneCall response missing 'current' weather data", "lat", lat, "lon", lon) - return nil, fmt.Errorf("current weather data not found in API response") - } - current := owmResp.Current + // --- Data Mapping from openWeatherMapCurrentResponse to domain.WeatherData --- - if len(current.Weather) == 0 { - f.logger.Warn("OpenWeatherMap response missing weather description details", "lat", lat, "lon", lon) - return nil, fmt.Errorf("weather data description not found in response") + if owmResp.Main == nil { + f.logger.Error("OpenWeatherMap Current response missing 'main' data block", "lat", lat, "lon", lon) + return nil, fmt.Errorf("main weather data block not found in API response") } - // Create domain object using pointers for optional fields - weatherData := &domain.WeatherData{} // Initialize empty struct first + weatherData := &domain.WeatherData{} - // Assign values using pointers, checking for nil where appropriate - weatherData.TempCelsius = ¤t.Temp - humidityFloat := float64(current.Humidity) + weatherData.TempCelsius = &owmResp.Main.Temp + humidityFloat := float64(owmResp.Main.Humidity) weatherData.Humidity = &humidityFloat - weatherData.Description = ¤t.Weather[0].Description - weatherData.Icon = ¤t.Weather[0].Icon - weatherData.WindSpeed = ¤t.WindSpeed - if current.Rain != nil { - weatherData.RainVolume1h = ¤t.Rain.OneH + + if len(owmResp.Weather) > 0 { + weatherData.Description = &owmResp.Weather[0].Description + weatherData.Icon = &owmResp.Weather[0].Icon + } else { + f.logger.Warn("OpenWeatherMap Current response missing 'weather' description details", "lat", lat, "lon", lon) } - observedTime := time.Unix(current.Dt, 0).UTC() + + if owmResp.Wind != nil { + weatherData.WindSpeed = &owmResp.Wind.Speed + } else { + f.logger.Warn("OpenWeatherMap Current response missing 'wind' data block", "lat", lat, "lon", lon) + } + + if owmResp.Rain != nil { + weatherData.RainVolume1h = &owmResp.Rain.OneH + } + + observedTime := time.Unix(owmResp.Dt, 0).UTC() weatherData.ObservedAt = &observedTime now := time.Now().UTC() weatherData.WeatherLastUpdated = &now - f.logger.Debug("Successfully fetched weather data", + logTemp := "nil" + if weatherData.TempCelsius != nil { + logTemp = fmt.Sprintf("%.2f", *weatherData.TempCelsius) + } + logDesc := "nil" + if weatherData.Description != nil { + logDesc = *weatherData.Description + } + f.logger.Debug("Successfully fetched and mapped weather data", "lat", lat, "lon", lon, - "temp", *weatherData.TempCelsius, - "description", *weatherData.Description) + "temp", logTemp, + "description", logDesc) return weatherData, nil } diff --git a/backend/internal/workers/weather_updater.go b/backend/internal/workers/weather_updater.go index 1b48880..42d843e 100644 --- a/backend/internal/workers/weather_updater.go +++ b/backend/internal/workers/weather_updater.go @@ -3,6 +3,7 @@ package workers import ( "context" + "fmt" "log/slog" "sync" "time" @@ -27,13 +28,23 @@ func NewWeatherUpdater( eventPublisher domain.EventPublisher, logger *slog.Logger, fetchInterval time.Duration, -) *WeatherUpdater { +) (*WeatherUpdater, error) { if logger == nil { logger = slog.Default() } if fetchInterval <= 0 { - fetchInterval = 15 * time.Minute + fetchInterval = 60 * time.Minute } + if farmRepo == nil { + return nil, fmt.Errorf("farmRepo cannot be nil") + } + if weatherFetcher == nil { + return nil, fmt.Errorf("weatherFetcher cannot be nil") + } + if eventPublisher == nil { + return nil, fmt.Errorf("eventPublisher cannot be nil") + } + return &WeatherUpdater{ farmRepo: farmRepo, weatherFetcher: weatherFetcher, @@ -41,7 +52,7 @@ func NewWeatherUpdater( logger: logger, fetchInterval: fetchInterval, stopChan: make(chan struct{}), - } + }, nil } func (w *WeatherUpdater) Start(ctx context.Context) { @@ -75,20 +86,22 @@ func (w *WeatherUpdater) Start(ctx context.Context) { func (w *WeatherUpdater) Stop() { w.logger.Info("Attempting to stop Weather Updater worker...") - close(w.stopChan) - w.wg.Wait() + select { + case <-w.stopChan: + default: + close(w.stopChan) + } + w.wg.Wait() // Wait for the goroutine to finish w.logger.Info("Weather Updater worker stopped") } func (w *WeatherUpdater) fetchAndUpdateAllFarms(ctx context.Context) { - // Use a background context for the repository call if the main context might cancel prematurely - // repoCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // Example timeout - // defer cancel() + repoCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // Use separate context for DB query + defer cancel() - // TODO: Need a GetAllFarms method in the FarmRepository or a way to efficiently get all farm locations. - farms, err := w.farmRepo.GetByOwnerID(ctx, "") // !! REPLACE with a proper GetAll method !! + farms, err := w.farmRepo.GetAll(repoCtx) // <-- Changed method call if err != nil { - w.logger.Error("Failed to get farms for weather update", "error", err) + w.logger.Error("Failed to get all farms for weather update", "error", err) return } if len(farms) == 0 { @@ -96,12 +109,15 @@ func (w *WeatherUpdater) fetchAndUpdateAllFarms(ctx context.Context) { return } - w.logger.Info("Found farms for weather update", "count", len(farms)) + w.logger.Info("Processing farms for weather update", "count", len(farms)) var fetchWg sync.WaitGroup fetchCtx, cancelFetches := context.WithCancel(ctx) defer cancelFetches() + concurrencyLimit := 5 + sem := make(chan struct{}, concurrencyLimit) + for _, farm := range farms { if farm.Lat == 0 && farm.Lon == 0 { w.logger.Warn("Skipping farm with zero coordinates", "farm_id", farm.UUID, "farm_name", farm.Name) @@ -109,10 +125,14 @@ func (w *WeatherUpdater) fetchAndUpdateAllFarms(ctx context.Context) { } fetchWg.Add(1) + sem <- struct{}{} go func(f domain.Farm) { defer fetchWg.Done() + defer func() { <-sem }() + select { case <-fetchCtx.Done(): + w.logger.Info("Weather fetch cancelled for farm", "farm_id", f.UUID, "reason", fetchCtx.Err()) return default: w.fetchAndPublishWeather(fetchCtx, f) @@ -121,7 +141,7 @@ func (w *WeatherUpdater) fetchAndUpdateAllFarms(ctx context.Context) { } fetchWg.Wait() - w.logger.Info("Finished weather fetch cycle for farms", "count", len(farms)) + w.logger.Debug("Finished weather fetch cycle for farms", "count", len(farms)) // Use Debug for cycle completion } func (w *WeatherUpdater) fetchAndPublishWeather(ctx context.Context, farm domain.Farm) { @@ -136,17 +156,17 @@ func (w *WeatherUpdater) fetchAndPublishWeather(ctx context.Context, farm domain } payloadMap := map[string]interface{}{ - "farm_id": farm.UUID, - "lat": farm.Lat, - "lon": farm.Lon, - "temp_celsius": weatherData.TempCelsius, - "humidity": weatherData.Humidity, - "description": weatherData.Description, - "icon": weatherData.Icon, - "wind_speed": weatherData.WindSpeed, - "rain_volume_1h": weatherData.RainVolume1h, - "observed_at": weatherData.ObservedAt, - "weather_last_updated": weatherData.WeatherLastUpdated, + "farm_id": farm.UUID, + "lat": farm.Lat, + "lon": farm.Lon, + "tempCelsius": weatherData.TempCelsius, + "humidity": weatherData.Humidity, + "description": weatherData.Description, + "icon": weatherData.Icon, + "windSpeed": weatherData.WindSpeed, + "rainVolume1h": weatherData.RainVolume1h, + "observedAt": weatherData.ObservedAt, + "weatherLastUpdated": weatherData.WeatherLastUpdated, } event := domain.Event{