From 009cfb10ff78e705d1e52a3ab2545cb5be3444cc Mon Sep 17 00:00:00 2001 From: Sosokker Date: Wed, 2 Apr 2025 18:04:24 +0700 Subject: [PATCH] feat(api): Update analytics endpoint and wire new components --- backend/internal/api/analytic.go | 66 ++++++++++++++++++++++ backend/internal/api/api.go | 49 +++++++++++++--- backend/internal/api/crop.go | 59 +++++++++----------- backend/internal/api/farm.go | 10 ++-- backend/internal/cmd/api.go | 95 ++++++++++++++++++++++---------- 5 files changed, 202 insertions(+), 77 deletions(-) create mode 100644 backend/internal/api/analytic.go diff --git a/backend/internal/api/analytic.go b/backend/internal/api/analytic.go new file mode 100644 index 0000000..ebb7228 --- /dev/null +++ b/backend/internal/api/analytic.go @@ -0,0 +1,66 @@ +package api + +import ( + "context" + "errors" + "net/http" + + "github.com/danielgtaylor/huma/v2" + "github.com/forfarm/backend/internal/domain" + "github.com/go-chi/chi/v5" + "github.com/google/uuid" +) + +func (a *api) registerAnalyticsRoutes(_ chi.Router, api huma.API) { + tags := []string{"analytics"} + prefix := "/analytics" + + huma.Register(api, huma.Operation{ + OperationID: "getFarmAnalytics", + Method: http.MethodGet, + Path: prefix + "/farm/{farm_id}", + Tags: tags, + Summary: "Get aggregated analytics data for a specific farm", + Description: "Retrieves various analytics metrics for a farm, requiring user ownership.", + }, a.getFarmAnalyticsHandler) +} + +type GetFarmAnalyticsInput struct { + Header string `header:"Authorization" required:"true" example:"Bearer token"` + FarmID string `path:"farm_id" required:"true" doc:"UUID of the farm to get analytics for" example:"a1b2c3d4-e5f6-7890-1234-567890abcdef"` +} + +type GetFarmAnalyticsOutput struct { + Body domain.FarmAnalytics `json:"body"` +} + +func (a *api) getFarmAnalyticsHandler(ctx context.Context, input *GetFarmAnalyticsInput) (*GetFarmAnalyticsOutput, error) { + userID, err := a.getUserIDFromHeader(input.Header) + if err != nil { + return nil, huma.Error401Unauthorized("Authentication failed: " + err.Error()) + } + + if _, err := uuid.Parse(input.FarmID); err != nil { + return nil, huma.Error400BadRequest("Invalid Farm ID format.") + } + + analyticsData, err := a.analyticsRepo.GetFarmAnalytics(ctx, input.FarmID) + if err != nil { + if errors.Is(err, domain.ErrNotFound) { + a.logger.Info("Analytics data not found for farm", "farm_id", input.FarmID) + return nil, huma.Error404NotFound("Analytics data not found for this farm.") + } + a.logger.Error("Failed to retrieve farm analytics", "farm_id", input.FarmID, "error", err) + return nil, huma.Error500InternalServerError("Failed to retrieve analytics data.") + } + + if analyticsData.OwnerID != userID { + a.logger.Warn("User attempted to access analytics for farm they do not own", "user_id", userID, "farm_id", input.FarmID, "owner_id", analyticsData.OwnerID) + return nil, huma.Error403Forbidden("You are not authorized to view analytics for this farm.") + } + + resp := &GetFarmAnalyticsOutput{ + Body: *analyticsData, + } + return resp, nil +} diff --git a/backend/internal/api/api.go b/backend/internal/api/api.go index 1875a5b..dc97556 100644 --- a/backend/internal/api/api.go +++ b/backend/internal/api/api.go @@ -7,6 +7,7 @@ import ( "log/slog" "net/http" "strings" + "time" "github.com/danielgtaylor/huma/v2" "github.com/danielgtaylor/huma/v2/adapters/humachi" @@ -15,9 +16,11 @@ import ( "github.com/go-chi/cors" "github.com/jackc/pgx/v5/pgxpool" + "github.com/forfarm/backend/internal/config" "github.com/forfarm/backend/internal/domain" m "github.com/forfarm/backend/internal/middlewares" "github.com/forfarm/backend/internal/repository" + "github.com/forfarm/backend/internal/services/weather" "github.com/forfarm/backend/internal/utilities" ) @@ -32,20 +35,46 @@ type api struct { plantRepo domain.PlantRepository inventoryRepo domain.InventoryRepository harvestRepo domain.HarvestRepository + analyticsRepo domain.AnalyticsRepository + + weatherFetcher domain.WeatherFetcher } -func NewAPI(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool, eventPublisher domain.EventPublisher) *api { +var weatherFetcherInstance domain.WeatherFetcher + +func GetWeatherFetcher() domain.WeatherFetcher { + return weatherFetcherInstance +} + +func NewAPI( + ctx context.Context, + logger *slog.Logger, + pool *pgxpool.Pool, + eventPublisher domain.EventPublisher, + analyticsRepo domain.AnalyticsRepository, + inventoryRepo domain.InventoryRepository, + croplandRepo domain.CroplandRepository, + farmRepo domain.FarmRepository, +) *api { client := &http.Client{} userRepository := repository.NewPostgresUser(pool) - croplandRepository := repository.NewPostgresCropland(pool) - farmRepository := repository.NewPostgresFarm(pool) plantRepository := repository.NewPostgresPlant(pool) - inventoryRepository := repository.NewPostgresInventory(pool) harvestRepository := repository.NewPostgresHarvest(pool) - farmRepository.SetEventPublisher(eventPublisher) + owmFetcher := weather.NewOpenWeatherMapFetcher(config.OPENWEATHER_API_KEY, client, logger) + cacheTTL, err := time.ParseDuration(config.OPENWEATHER_CACHE_TTL) + if err != nil { + logger.Warn("Invalid OPENWEATHER_CACHE_TTL format, using default 15m", "value", config.OPENWEATHER_CACHE_TTL, "error", err) + cacheTTL = 15 * time.Minute + } + cleanupInterval := cacheTTL * 2 + if cleanupInterval < 5*time.Minute { + cleanupInterval = 5 * time.Minute + } + cachedWeatherFetcher := weather.NewCachedWeatherFetcher(owmFetcher, cacheTTL, cleanupInterval, logger) + weatherFetcherInstance = cachedWeatherFetcher return &api{ logger: logger, @@ -53,11 +82,14 @@ func NewAPI(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool, eventP eventPublisher: eventPublisher, userRepo: userRepository, - cropRepo: croplandRepository, - farmRepo: farmRepository, + cropRepo: croplandRepo, + farmRepo: farmRepo, plantRepo: plantRepository, - inventoryRepo: inventoryRepository, + inventoryRepo: inventoryRepo, harvestRepo: harvestRepository, + analyticsRepo: analyticsRepo, + + weatherFetcher: cachedWeatherFetcher, } } @@ -107,6 +139,7 @@ func (a *api) Routes() *chi.Mux { a.registerFarmRoutes(r, api) a.registerUserRoutes(r, api) a.registerInventoryRoutes(r, api) + a.registerAnalyticsRoutes(r, api) }) return router diff --git a/backend/internal/api/crop.go b/backend/internal/api/crop.go index 8ff12c7..8a3053b 100644 --- a/backend/internal/api/crop.go +++ b/backend/internal/api/crop.go @@ -2,6 +2,7 @@ package api import ( "context" + "encoding/json" "errors" "net/http" @@ -11,7 +12,6 @@ import ( "github.com/gofrs/uuid" ) -// Register the crop routes func (a *api) registerCropRoutes(_ chi.Router, api huma.API) { tags := []string{"crop"} @@ -50,47 +50,42 @@ func (a *api) registerCropRoutes(_ chi.Router, api huma.API) { }, a.createOrUpdateCroplandHandler) } -// Response structure for all croplands type GetCroplandsOutput struct { Body struct { Croplands []domain.Cropland `json:"croplands"` } `json:"body"` } -// Response structure for single cropland by ID type GetCroplandByIDOutput struct { Body struct { Cropland domain.Cropland `json:"cropland"` } `json:"body"` } -// Request structure for creating or updating a cropland type CreateOrUpdateCroplandInput struct { Body struct { - UUID string `json:"uuid,omitempty"` // Optional for create, required for update - Name string `json:"name"` - Status string `json:"status"` - Priority int `json:"priority"` - LandSize float64 `json:"land_size"` - GrowthStage string `json:"growth_stage"` - PlantID string `json:"plant_id"` - FarmID string `json:"farm_id"` + UUID string `json:"UUID,omitempty"` + Name string `json:"Name"` + Status string `json:"Status"` + Priority int `json:"Priority"` + LandSize float64 `json:"LandSize"` + GrowthStage string `json:"GrowthStage"` + PlantID string `json:"PlantID"` + FarmID string `json:"FarmID"` + GeoFeature json.RawMessage `json:"GeoFeature,omitempty" doc:"GeoJSON-like feature object (marker, polygon, etc.)" example:"{\"type\":\"marker\",\"position\":{\"lat\":13.84,\"lng\":100.48}}"` } `json:"body"` } -// Response structure for creating or updating a cropland type CreateOrUpdateCroplandOutput struct { Body struct { Cropland domain.Cropland `json:"cropland"` } `json:"body"` } -// GetAllCroplands handles GET /crop endpoint func (a *api) getAllCroplandsHandler(ctx context.Context, input *struct{}) (*GetCroplandsOutput, error) { resp := &GetCroplandsOutput{} - // Fetch all croplands without filtering by farmID - croplands, err := a.cropRepo.GetAll(ctx) // Use the GetAll method + croplands, err := a.cropRepo.GetAll(ctx) if err != nil { return nil, err } @@ -99,24 +94,20 @@ func (a *api) getAllCroplandsHandler(ctx context.Context, input *struct{}) (*Get return resp, nil } -// GetCroplandByID handles GET /crop/{uuid} endpoint func (a *api) getCroplandByIDHandler(ctx context.Context, input *struct { UUID string `path:"uuid" example:"550e8400-e29b-41d4-a716-446655440000"` }) (*GetCroplandByIDOutput, error) { resp := &GetCroplandByIDOutput{} - // Validate the UUID format if input.UUID == "" { return nil, huma.Error400BadRequest("UUID parameter is required") } - // Check if the UUID is in a valid format _, err := uuid.FromString(input.UUID) if err != nil { return nil, huma.Error400BadRequest("invalid UUID format") } - // Fetch cropland by ID cropland, err := a.cropRepo.GetByID(ctx, input.UUID) if err != nil { if errors.Is(err, domain.ErrNotFound) { @@ -129,24 +120,20 @@ func (a *api) getCroplandByIDHandler(ctx context.Context, input *struct { return resp, nil } -// GetAllCroplandsByFarmID handles GET /crop/farm/{farm_id} endpoint func (a *api) getAllCroplandsByFarmIDHandler(ctx context.Context, input *struct { FarmID string `path:"farm_id" example:"550e8400-e29b-41d4-a716-446655440000"` }) (*GetCroplandsOutput, error) { resp := &GetCroplandsOutput{} - // Validate the FarmID format if input.FarmID == "" { return nil, huma.Error400BadRequest("FarmID parameter is required") } - // Check if the FarmID is in a valid format _, err := uuid.FromString(input.FarmID) if err != nil { return nil, huma.Error400BadRequest("invalid FarmID format") } - // Fetch croplands by FarmID croplands, err := a.cropRepo.GetByFarmID(ctx, input.FarmID) if err != nil { return nil, err @@ -156,11 +143,9 @@ func (a *api) getAllCroplandsByFarmIDHandler(ctx context.Context, input *struct return resp, nil } -// CreateOrUpdateCropland handles POST /crop endpoint func (a *api) createOrUpdateCroplandHandler(ctx context.Context, input *CreateOrUpdateCroplandInput) (*CreateOrUpdateCroplandOutput, error) { resp := &CreateOrUpdateCroplandOutput{} - // Validate required fields if input.Body.Name == "" { return nil, huma.Error400BadRequest("name is required") } @@ -177,15 +162,22 @@ func (a *api) createOrUpdateCroplandHandler(ctx context.Context, input *CreateOr return nil, huma.Error400BadRequest("farm_id is required") } - // Validate UUID if provided if input.Body.UUID != "" { - _, err := uuid.FromString(input.Body.UUID) - if err != nil { - return nil, huma.Error400BadRequest("invalid UUID format") + if _, err := uuid.FromString(input.Body.UUID); err != nil { + return nil, huma.Error400BadRequest("invalid cropland UUID format") } } + if _, err := uuid.FromString(input.Body.PlantID); err != nil { + return nil, huma.Error400BadRequest("invalid plant_id UUID format") + } + if _, err := uuid.FromString(input.Body.FarmID); err != nil { + return nil, huma.Error400BadRequest("invalid farm_id UUID format") + } + + if input.Body.GeoFeature != nil && !json.Valid(input.Body.GeoFeature) { + return nil, huma.Error400BadRequest("invalid JSON format for geo_feature") + } - // Map input to domain.Cropland cropland := &domain.Cropland{ UUID: input.Body.UUID, Name: input.Body.Name, @@ -195,15 +187,14 @@ func (a *api) createOrUpdateCroplandHandler(ctx context.Context, input *CreateOr GrowthStage: input.Body.GrowthStage, PlantID: input.Body.PlantID, FarmID: input.Body.FarmID, + GeoFeature: input.Body.GeoFeature, } - // Create or update the cropland err := a.cropRepo.CreateOrUpdate(ctx, cropland) if err != nil { - return nil, err + return nil, huma.Error500InternalServerError("failed to save cropland") } - // Return the created/updated cropland resp.Body.Cropland = *cropland return resp, nil } diff --git a/backend/internal/api/farm.go b/backend/internal/api/farm.go index 647e72d..89afbcc 100644 --- a/backend/internal/api/farm.go +++ b/backend/internal/api/farm.go @@ -95,11 +95,11 @@ type UpdateFarmInput struct { Header string `header:"Authorization" required:"true" example:"Bearer token"` FarmID string `path:"farm_id"` Body struct { - Name string `json:"name,omitempty"` - Lat *float64 `json:"lat,omitempty"` - Lon *float64 `json:"lon,omitempty"` - FarmType *string `json:"farm_type,omitempty"` - TotalSize *string `json:"total_size,omitempty"` + Name string `json:"Name,omitempty"` + Lat *float64 `json:"Lat,omitempty"` + Lon *float64 `json:"Lon,omitempty"` + FarmType *string `json:"FarmType,omitempty"` + TotalSize *string `json:"TotalSize,omitempty"` } } diff --git a/backend/internal/cmd/api.go b/backend/internal/cmd/api.go index c20414d..5bcbe1d 100644 --- a/backend/internal/cmd/api.go +++ b/backend/internal/cmd/api.go @@ -1,11 +1,13 @@ +// backend/internal/cmd/api.go package cmd import ( "context" - "os" - + "fmt" "log/slog" "net/http" + "os" + "time" "github.com/spf13/cobra" @@ -14,6 +16,7 @@ import ( "github.com/forfarm/backend/internal/config" "github.com/forfarm/backend/internal/event" "github.com/forfarm/backend/internal/repository" + "github.com/forfarm/backend/internal/workers" ) func APICmd(ctx context.Context) *cobra.Command { @@ -28,57 +31,89 @@ func APICmd(ctx context.Context) *cobra.Command { pool, err := cmdutil.NewDatabasePool(ctx, 16) if err != nil { + logger.Error("failed to create database pool", "error", err) return err } defer pool.Close() - logger.Info("connected to database") - // Events - + // --- Event Bus --- eventBus, err := event.NewRabbitMQEventBus(config.RABBITMQ_URL, logger) if err != nil { - logger.Error("failed to connect to event bus", "err", err) - os.Exit(1) + logger.Error("failed to connect to event bus", "url", config.RABBITMQ_URL, "error", err) + return fmt.Errorf("event bus connection failed: %w", err) } - logger.Info("connecting to event bus", "url", config.RABBITMQ_URL) + defer eventBus.Close() + logger.Info("connected to event bus", "url", config.RABBITMQ_URL) - aggregator := event.NewEventAggregator(eventBus, eventBus, logger) - if err := aggregator.Start(ctx); err != nil { - logger.Error("failed to start event aggregator", "err", err) - os.Exit(1) - } - logger.Info("Start event aggregator") + analyticsRepo := repository.NewPostgresFarmAnalyticsRepository(pool, logger) - analyticRepo := repository.NewPostgresAnalyticsRepository(pool) - projection := event.NewAnalyticsProjection(eventBus, analyticRepo, logger) - if err := projection.Start(ctx); err != nil { - logger.Error("failed to start analytics projection", "err", err) - os.Exit(1) - } - logger.Info("Start analytics projection") - // + farmRepo := repository.NewPostgresFarm(pool) + farmRepo.SetEventPublisher(eventBus) - api := api.NewAPI(ctx, logger, pool, eventBus) - server := api.Server(port) + inventoryRepo := repository.NewPostgresInventory(pool, eventBus) + croplandRepo := repository.NewPostgresCropland(pool) + croplandRepo.SetEventPublisher(eventBus) + + projection := event.NewFarmAnalyticsProjection(eventBus, analyticsRepo, logger) go func() { - if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logger.Error("failed to start server", "err", err) + if err := projection.Start(ctx); err != nil { + logger.Error("FarmAnalyticsProjection failed to start listening", "error", err) } }() + logger.Info("Farm Analytics Projection started") - logger.Info("started API", "port", port) + weatherFetcher := api.GetWeatherFetcher() // Get fetcher instance from API setup + weatherInterval, err := time.ParseDuration(config.WEATHER_FETCH_INTERVAL) + if err != nil { + logger.Warn("Invalid WEATHER_FETCH_INTERVAL, using default 15m", "value", config.WEATHER_FETCH_INTERVAL, "error", err) + weatherInterval = 15 * time.Minute + } + weatherUpdater := workers.NewWeatherUpdater(farmRepo, weatherFetcher, eventBus, logger, weatherInterval) + weatherUpdater.Start(ctx) // Pass the main context + logger.Info("Weather Updater worker started", "interval", weatherInterval) - <-ctx.Done() + apiInstance := api.NewAPI(ctx, logger, pool, eventBus, analyticsRepo, inventoryRepo, croplandRepo, farmRepo) // Pass new repo + + server := apiInstance.Server(port) + + serverErrChan := make(chan error, 1) + go func() { + logger.Info("starting API server", "port", port) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("API server failed", "error", err) + serverErrChan <- err // Send error to channel + } + close(serverErrChan) + }() + + select { + case err := <-serverErrChan: + logger.Error("Server error received, initiating shutdown.", "error", err) + case <-ctx.Done(): + logger.Info("Shutdown signal received, initiating graceful shutdown...") + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) // 15-second grace period + defer cancel() + + weatherUpdater.Stop() // Signal and wait + + if err := server.Shutdown(shutdownCtx); err != nil { + logger.Error("HTTP server graceful shutdown failed", "error", err) + } else { + logger.Info("HTTP server shutdown complete.") + } - if err := server.Shutdown(ctx); err != nil { - logger.Error("failed to gracefully shutdown server", "err", err) } + logger.Info("Application shutdown complete.") return nil }, } + // Add flags if needed (e.g., --port) + // cmd.Flags().IntVarP(&port, "port", "p", config.PORT, "Port for the API server") + return cmd }