feat(api): Update analytics endpoint and wire new components

This commit is contained in:
Sosokker 2025-04-02 18:04:24 +07:00
parent 90ee8ff529
commit 009cfb10ff
5 changed files with 202 additions and 77 deletions

View File

@ -0,0 +1,66 @@
package api
import (
"context"
"errors"
"net/http"
"github.com/danielgtaylor/huma/v2"
"github.com/forfarm/backend/internal/domain"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
)
func (a *api) registerAnalyticsRoutes(_ chi.Router, api huma.API) {
tags := []string{"analytics"}
prefix := "/analytics"
huma.Register(api, huma.Operation{
OperationID: "getFarmAnalytics",
Method: http.MethodGet,
Path: prefix + "/farm/{farm_id}",
Tags: tags,
Summary: "Get aggregated analytics data for a specific farm",
Description: "Retrieves various analytics metrics for a farm, requiring user ownership.",
}, a.getFarmAnalyticsHandler)
}
type GetFarmAnalyticsInput struct {
Header string `header:"Authorization" required:"true" example:"Bearer token"`
FarmID string `path:"farm_id" required:"true" doc:"UUID of the farm to get analytics for" example:"a1b2c3d4-e5f6-7890-1234-567890abcdef"`
}
type GetFarmAnalyticsOutput struct {
Body domain.FarmAnalytics `json:"body"`
}
func (a *api) getFarmAnalyticsHandler(ctx context.Context, input *GetFarmAnalyticsInput) (*GetFarmAnalyticsOutput, error) {
userID, err := a.getUserIDFromHeader(input.Header)
if err != nil {
return nil, huma.Error401Unauthorized("Authentication failed: " + err.Error())
}
if _, err := uuid.Parse(input.FarmID); err != nil {
return nil, huma.Error400BadRequest("Invalid Farm ID format.")
}
analyticsData, err := a.analyticsRepo.GetFarmAnalytics(ctx, input.FarmID)
if err != nil {
if errors.Is(err, domain.ErrNotFound) {
a.logger.Info("Analytics data not found for farm", "farm_id", input.FarmID)
return nil, huma.Error404NotFound("Analytics data not found for this farm.")
}
a.logger.Error("Failed to retrieve farm analytics", "farm_id", input.FarmID, "error", err)
return nil, huma.Error500InternalServerError("Failed to retrieve analytics data.")
}
if analyticsData.OwnerID != userID {
a.logger.Warn("User attempted to access analytics for farm they do not own", "user_id", userID, "farm_id", input.FarmID, "owner_id", analyticsData.OwnerID)
return nil, huma.Error403Forbidden("You are not authorized to view analytics for this farm.")
}
resp := &GetFarmAnalyticsOutput{
Body: *analyticsData,
}
return resp, nil
}

View File

@ -7,6 +7,7 @@ import (
"log/slog" "log/slog"
"net/http" "net/http"
"strings" "strings"
"time"
"github.com/danielgtaylor/huma/v2" "github.com/danielgtaylor/huma/v2"
"github.com/danielgtaylor/huma/v2/adapters/humachi" "github.com/danielgtaylor/huma/v2/adapters/humachi"
@ -15,9 +16,11 @@ import (
"github.com/go-chi/cors" "github.com/go-chi/cors"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/forfarm/backend/internal/config"
"github.com/forfarm/backend/internal/domain" "github.com/forfarm/backend/internal/domain"
m "github.com/forfarm/backend/internal/middlewares" m "github.com/forfarm/backend/internal/middlewares"
"github.com/forfarm/backend/internal/repository" "github.com/forfarm/backend/internal/repository"
"github.com/forfarm/backend/internal/services/weather"
"github.com/forfarm/backend/internal/utilities" "github.com/forfarm/backend/internal/utilities"
) )
@ -32,20 +35,46 @@ type api struct {
plantRepo domain.PlantRepository plantRepo domain.PlantRepository
inventoryRepo domain.InventoryRepository inventoryRepo domain.InventoryRepository
harvestRepo domain.HarvestRepository harvestRepo domain.HarvestRepository
analyticsRepo domain.AnalyticsRepository
weatherFetcher domain.WeatherFetcher
} }
func NewAPI(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool, eventPublisher domain.EventPublisher) *api { var weatherFetcherInstance domain.WeatherFetcher
func GetWeatherFetcher() domain.WeatherFetcher {
return weatherFetcherInstance
}
func NewAPI(
ctx context.Context,
logger *slog.Logger,
pool *pgxpool.Pool,
eventPublisher domain.EventPublisher,
analyticsRepo domain.AnalyticsRepository,
inventoryRepo domain.InventoryRepository,
croplandRepo domain.CroplandRepository,
farmRepo domain.FarmRepository,
) *api {
client := &http.Client{} client := &http.Client{}
userRepository := repository.NewPostgresUser(pool) userRepository := repository.NewPostgresUser(pool)
croplandRepository := repository.NewPostgresCropland(pool)
farmRepository := repository.NewPostgresFarm(pool)
plantRepository := repository.NewPostgresPlant(pool) plantRepository := repository.NewPostgresPlant(pool)
inventoryRepository := repository.NewPostgresInventory(pool)
harvestRepository := repository.NewPostgresHarvest(pool) harvestRepository := repository.NewPostgresHarvest(pool)
farmRepository.SetEventPublisher(eventPublisher) owmFetcher := weather.NewOpenWeatherMapFetcher(config.OPENWEATHER_API_KEY, client, logger)
cacheTTL, err := time.ParseDuration(config.OPENWEATHER_CACHE_TTL)
if err != nil {
logger.Warn("Invalid OPENWEATHER_CACHE_TTL format, using default 15m", "value", config.OPENWEATHER_CACHE_TTL, "error", err)
cacheTTL = 15 * time.Minute
}
cleanupInterval := cacheTTL * 2
if cleanupInterval < 5*time.Minute {
cleanupInterval = 5 * time.Minute
}
cachedWeatherFetcher := weather.NewCachedWeatherFetcher(owmFetcher, cacheTTL, cleanupInterval, logger)
weatherFetcherInstance = cachedWeatherFetcher
return &api{ return &api{
logger: logger, logger: logger,
@ -53,11 +82,14 @@ func NewAPI(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool, eventP
eventPublisher: eventPublisher, eventPublisher: eventPublisher,
userRepo: userRepository, userRepo: userRepository,
cropRepo: croplandRepository, cropRepo: croplandRepo,
farmRepo: farmRepository, farmRepo: farmRepo,
plantRepo: plantRepository, plantRepo: plantRepository,
inventoryRepo: inventoryRepository, inventoryRepo: inventoryRepo,
harvestRepo: harvestRepository, harvestRepo: harvestRepository,
analyticsRepo: analyticsRepo,
weatherFetcher: cachedWeatherFetcher,
} }
} }
@ -107,6 +139,7 @@ func (a *api) Routes() *chi.Mux {
a.registerFarmRoutes(r, api) a.registerFarmRoutes(r, api)
a.registerUserRoutes(r, api) a.registerUserRoutes(r, api)
a.registerInventoryRoutes(r, api) a.registerInventoryRoutes(r, api)
a.registerAnalyticsRoutes(r, api)
}) })
return router return router

View File

@ -2,6 +2,7 @@ package api
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"net/http" "net/http"
@ -11,7 +12,6 @@ import (
"github.com/gofrs/uuid" "github.com/gofrs/uuid"
) )
// Register the crop routes
func (a *api) registerCropRoutes(_ chi.Router, api huma.API) { func (a *api) registerCropRoutes(_ chi.Router, api huma.API) {
tags := []string{"crop"} tags := []string{"crop"}
@ -50,47 +50,42 @@ func (a *api) registerCropRoutes(_ chi.Router, api huma.API) {
}, a.createOrUpdateCroplandHandler) }, a.createOrUpdateCroplandHandler)
} }
// Response structure for all croplands
type GetCroplandsOutput struct { type GetCroplandsOutput struct {
Body struct { Body struct {
Croplands []domain.Cropland `json:"croplands"` Croplands []domain.Cropland `json:"croplands"`
} `json:"body"` } `json:"body"`
} }
// Response structure for single cropland by ID
type GetCroplandByIDOutput struct { type GetCroplandByIDOutput struct {
Body struct { Body struct {
Cropland domain.Cropland `json:"cropland"` Cropland domain.Cropland `json:"cropland"`
} `json:"body"` } `json:"body"`
} }
// Request structure for creating or updating a cropland
type CreateOrUpdateCroplandInput struct { type CreateOrUpdateCroplandInput struct {
Body struct { Body struct {
UUID string `json:"uuid,omitempty"` // Optional for create, required for update UUID string `json:"UUID,omitempty"`
Name string `json:"name"` Name string `json:"Name"`
Status string `json:"status"` Status string `json:"Status"`
Priority int `json:"priority"` Priority int `json:"Priority"`
LandSize float64 `json:"land_size"` LandSize float64 `json:"LandSize"`
GrowthStage string `json:"growth_stage"` GrowthStage string `json:"GrowthStage"`
PlantID string `json:"plant_id"` PlantID string `json:"PlantID"`
FarmID string `json:"farm_id"` FarmID string `json:"FarmID"`
GeoFeature json.RawMessage `json:"GeoFeature,omitempty" doc:"GeoJSON-like feature object (marker, polygon, etc.)" example:"{\"type\":\"marker\",\"position\":{\"lat\":13.84,\"lng\":100.48}}"`
} `json:"body"` } `json:"body"`
} }
// Response structure for creating or updating a cropland
type CreateOrUpdateCroplandOutput struct { type CreateOrUpdateCroplandOutput struct {
Body struct { Body struct {
Cropland domain.Cropland `json:"cropland"` Cropland domain.Cropland `json:"cropland"`
} `json:"body"` } `json:"body"`
} }
// GetAllCroplands handles GET /crop endpoint
func (a *api) getAllCroplandsHandler(ctx context.Context, input *struct{}) (*GetCroplandsOutput, error) { func (a *api) getAllCroplandsHandler(ctx context.Context, input *struct{}) (*GetCroplandsOutput, error) {
resp := &GetCroplandsOutput{} resp := &GetCroplandsOutput{}
// Fetch all croplands without filtering by farmID croplands, err := a.cropRepo.GetAll(ctx)
croplands, err := a.cropRepo.GetAll(ctx) // Use the GetAll method
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -99,24 +94,20 @@ func (a *api) getAllCroplandsHandler(ctx context.Context, input *struct{}) (*Get
return resp, nil return resp, nil
} }
// GetCroplandByID handles GET /crop/{uuid} endpoint
func (a *api) getCroplandByIDHandler(ctx context.Context, input *struct { func (a *api) getCroplandByIDHandler(ctx context.Context, input *struct {
UUID string `path:"uuid" example:"550e8400-e29b-41d4-a716-446655440000"` UUID string `path:"uuid" example:"550e8400-e29b-41d4-a716-446655440000"`
}) (*GetCroplandByIDOutput, error) { }) (*GetCroplandByIDOutput, error) {
resp := &GetCroplandByIDOutput{} resp := &GetCroplandByIDOutput{}
// Validate the UUID format
if input.UUID == "" { if input.UUID == "" {
return nil, huma.Error400BadRequest("UUID parameter is required") return nil, huma.Error400BadRequest("UUID parameter is required")
} }
// Check if the UUID is in a valid format
_, err := uuid.FromString(input.UUID) _, err := uuid.FromString(input.UUID)
if err != nil { if err != nil {
return nil, huma.Error400BadRequest("invalid UUID format") return nil, huma.Error400BadRequest("invalid UUID format")
} }
// Fetch cropland by ID
cropland, err := a.cropRepo.GetByID(ctx, input.UUID) cropland, err := a.cropRepo.GetByID(ctx, input.UUID)
if err != nil { if err != nil {
if errors.Is(err, domain.ErrNotFound) { if errors.Is(err, domain.ErrNotFound) {
@ -129,24 +120,20 @@ func (a *api) getCroplandByIDHandler(ctx context.Context, input *struct {
return resp, nil return resp, nil
} }
// GetAllCroplandsByFarmID handles GET /crop/farm/{farm_id} endpoint
func (a *api) getAllCroplandsByFarmIDHandler(ctx context.Context, input *struct { func (a *api) getAllCroplandsByFarmIDHandler(ctx context.Context, input *struct {
FarmID string `path:"farm_id" example:"550e8400-e29b-41d4-a716-446655440000"` FarmID string `path:"farm_id" example:"550e8400-e29b-41d4-a716-446655440000"`
}) (*GetCroplandsOutput, error) { }) (*GetCroplandsOutput, error) {
resp := &GetCroplandsOutput{} resp := &GetCroplandsOutput{}
// Validate the FarmID format
if input.FarmID == "" { if input.FarmID == "" {
return nil, huma.Error400BadRequest("FarmID parameter is required") return nil, huma.Error400BadRequest("FarmID parameter is required")
} }
// Check if the FarmID is in a valid format
_, err := uuid.FromString(input.FarmID) _, err := uuid.FromString(input.FarmID)
if err != nil { if err != nil {
return nil, huma.Error400BadRequest("invalid FarmID format") return nil, huma.Error400BadRequest("invalid FarmID format")
} }
// Fetch croplands by FarmID
croplands, err := a.cropRepo.GetByFarmID(ctx, input.FarmID) croplands, err := a.cropRepo.GetByFarmID(ctx, input.FarmID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -156,11 +143,9 @@ func (a *api) getAllCroplandsByFarmIDHandler(ctx context.Context, input *struct
return resp, nil return resp, nil
} }
// CreateOrUpdateCropland handles POST /crop endpoint
func (a *api) createOrUpdateCroplandHandler(ctx context.Context, input *CreateOrUpdateCroplandInput) (*CreateOrUpdateCroplandOutput, error) { func (a *api) createOrUpdateCroplandHandler(ctx context.Context, input *CreateOrUpdateCroplandInput) (*CreateOrUpdateCroplandOutput, error) {
resp := &CreateOrUpdateCroplandOutput{} resp := &CreateOrUpdateCroplandOutput{}
// Validate required fields
if input.Body.Name == "" { if input.Body.Name == "" {
return nil, huma.Error400BadRequest("name is required") return nil, huma.Error400BadRequest("name is required")
} }
@ -177,15 +162,22 @@ func (a *api) createOrUpdateCroplandHandler(ctx context.Context, input *CreateOr
return nil, huma.Error400BadRequest("farm_id is required") return nil, huma.Error400BadRequest("farm_id is required")
} }
// Validate UUID if provided
if input.Body.UUID != "" { if input.Body.UUID != "" {
_, err := uuid.FromString(input.Body.UUID) if _, err := uuid.FromString(input.Body.UUID); err != nil {
if err != nil { return nil, huma.Error400BadRequest("invalid cropland UUID format")
return nil, huma.Error400BadRequest("invalid UUID format")
} }
} }
if _, err := uuid.FromString(input.Body.PlantID); err != nil {
return nil, huma.Error400BadRequest("invalid plant_id UUID format")
}
if _, err := uuid.FromString(input.Body.FarmID); err != nil {
return nil, huma.Error400BadRequest("invalid farm_id UUID format")
}
if input.Body.GeoFeature != nil && !json.Valid(input.Body.GeoFeature) {
return nil, huma.Error400BadRequest("invalid JSON format for geo_feature")
}
// Map input to domain.Cropland
cropland := &domain.Cropland{ cropland := &domain.Cropland{
UUID: input.Body.UUID, UUID: input.Body.UUID,
Name: input.Body.Name, Name: input.Body.Name,
@ -195,15 +187,14 @@ func (a *api) createOrUpdateCroplandHandler(ctx context.Context, input *CreateOr
GrowthStage: input.Body.GrowthStage, GrowthStage: input.Body.GrowthStage,
PlantID: input.Body.PlantID, PlantID: input.Body.PlantID,
FarmID: input.Body.FarmID, FarmID: input.Body.FarmID,
GeoFeature: input.Body.GeoFeature,
} }
// Create or update the cropland
err := a.cropRepo.CreateOrUpdate(ctx, cropland) err := a.cropRepo.CreateOrUpdate(ctx, cropland)
if err != nil { if err != nil {
return nil, err return nil, huma.Error500InternalServerError("failed to save cropland")
} }
// Return the created/updated cropland
resp.Body.Cropland = *cropland resp.Body.Cropland = *cropland
return resp, nil return resp, nil
} }

View File

@ -95,11 +95,11 @@ type UpdateFarmInput struct {
Header string `header:"Authorization" required:"true" example:"Bearer token"` Header string `header:"Authorization" required:"true" example:"Bearer token"`
FarmID string `path:"farm_id"` FarmID string `path:"farm_id"`
Body struct { Body struct {
Name string `json:"name,omitempty"` Name string `json:"Name,omitempty"`
Lat *float64 `json:"lat,omitempty"` Lat *float64 `json:"Lat,omitempty"`
Lon *float64 `json:"lon,omitempty"` Lon *float64 `json:"Lon,omitempty"`
FarmType *string `json:"farm_type,omitempty"` FarmType *string `json:"FarmType,omitempty"`
TotalSize *string `json:"total_size,omitempty"` TotalSize *string `json:"TotalSize,omitempty"`
} }
} }

View File

@ -1,11 +1,13 @@
// backend/internal/cmd/api.go
package cmd package cmd
import ( import (
"context" "context"
"os" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"os"
"time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -14,6 +16,7 @@ import (
"github.com/forfarm/backend/internal/config" "github.com/forfarm/backend/internal/config"
"github.com/forfarm/backend/internal/event" "github.com/forfarm/backend/internal/event"
"github.com/forfarm/backend/internal/repository" "github.com/forfarm/backend/internal/repository"
"github.com/forfarm/backend/internal/workers"
) )
func APICmd(ctx context.Context) *cobra.Command { func APICmd(ctx context.Context) *cobra.Command {
@ -28,57 +31,89 @@ func APICmd(ctx context.Context) *cobra.Command {
pool, err := cmdutil.NewDatabasePool(ctx, 16) pool, err := cmdutil.NewDatabasePool(ctx, 16)
if err != nil { if err != nil {
logger.Error("failed to create database pool", "error", err)
return err return err
} }
defer pool.Close() defer pool.Close()
logger.Info("connected to database") logger.Info("connected to database")
// Events // --- Event Bus ---
eventBus, err := event.NewRabbitMQEventBus(config.RABBITMQ_URL, logger) eventBus, err := event.NewRabbitMQEventBus(config.RABBITMQ_URL, logger)
if err != nil { if err != nil {
logger.Error("failed to connect to event bus", "err", err) logger.Error("failed to connect to event bus", "url", config.RABBITMQ_URL, "error", err)
os.Exit(1) return fmt.Errorf("event bus connection failed: %w", err)
} }
logger.Info("connecting to event bus", "url", config.RABBITMQ_URL) defer eventBus.Close()
logger.Info("connected to event bus", "url", config.RABBITMQ_URL)
aggregator := event.NewEventAggregator(eventBus, eventBus, logger) analyticsRepo := repository.NewPostgresFarmAnalyticsRepository(pool, logger)
if err := aggregator.Start(ctx); err != nil {
logger.Error("failed to start event aggregator", "err", err)
os.Exit(1)
}
logger.Info("Start event aggregator")
analyticRepo := repository.NewPostgresAnalyticsRepository(pool) farmRepo := repository.NewPostgresFarm(pool)
projection := event.NewAnalyticsProjection(eventBus, analyticRepo, logger) farmRepo.SetEventPublisher(eventBus)
if err := projection.Start(ctx); err != nil {
logger.Error("failed to start analytics projection", "err", err)
os.Exit(1)
}
logger.Info("Start analytics projection")
//
api := api.NewAPI(ctx, logger, pool, eventBus) inventoryRepo := repository.NewPostgresInventory(pool, eventBus)
server := api.Server(port)
croplandRepo := repository.NewPostgresCropland(pool)
croplandRepo.SetEventPublisher(eventBus)
projection := event.NewFarmAnalyticsProjection(eventBus, analyticsRepo, logger)
go func() { go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { if err := projection.Start(ctx); err != nil {
logger.Error("failed to start server", "err", err) logger.Error("FarmAnalyticsProjection failed to start listening", "error", err)
} }
}() }()
logger.Info("Farm Analytics Projection started")
logger.Info("started API", "port", port) weatherFetcher := api.GetWeatherFetcher() // Get fetcher instance from API setup
weatherInterval, err := time.ParseDuration(config.WEATHER_FETCH_INTERVAL)
if err != nil {
logger.Warn("Invalid WEATHER_FETCH_INTERVAL, using default 15m", "value", config.WEATHER_FETCH_INTERVAL, "error", err)
weatherInterval = 15 * time.Minute
}
weatherUpdater := workers.NewWeatherUpdater(farmRepo, weatherFetcher, eventBus, logger, weatherInterval)
weatherUpdater.Start(ctx) // Pass the main context
logger.Info("Weather Updater worker started", "interval", weatherInterval)
<-ctx.Done() apiInstance := api.NewAPI(ctx, logger, pool, eventBus, analyticsRepo, inventoryRepo, croplandRepo, farmRepo) // Pass new repo
server := apiInstance.Server(port)
serverErrChan := make(chan error, 1)
go func() {
logger.Info("starting API server", "port", port)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("API server failed", "error", err)
serverErrChan <- err // Send error to channel
}
close(serverErrChan)
}()
select {
case err := <-serverErrChan:
logger.Error("Server error received, initiating shutdown.", "error", err)
case <-ctx.Done():
logger.Info("Shutdown signal received, initiating graceful shutdown...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) // 15-second grace period
defer cancel()
weatherUpdater.Stop() // Signal and wait
if err := server.Shutdown(shutdownCtx); err != nil {
logger.Error("HTTP server graceful shutdown failed", "error", err)
} else {
logger.Info("HTTP server shutdown complete.")
}
if err := server.Shutdown(ctx); err != nil {
logger.Error("failed to gracefully shutdown server", "err", err)
} }
logger.Info("Application shutdown complete.")
return nil return nil
}, },
} }
// Add flags if needed (e.g., --port)
// cmd.Flags().IntVarP(&port, "port", "p", config.PORT, "Port for the API server")
return cmd return cmd
} }