diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 856c82b..9b9dbb2 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -7,16 +7,19 @@ import ( ) var ( - PORT int - POSTGRES_USER string - POSTGRES_PASSWORD string - POSTGRES_DB string - DATABASE_URL string - GOOGLE_CLIENT_ID string - GOOGLE_CLIENT_SECRET string - GOOGLE_REDIRECT_URL string - JWT_SECRET_KEY string - RABBITMQ_URL string + PORT int + POSTGRES_USER string + POSTGRES_PASSWORD string + POSTGRES_DB string + DATABASE_URL string + GOOGLE_CLIENT_ID string + GOOGLE_CLIENT_SECRET string + GOOGLE_REDIRECT_URL string + JWT_SECRET_KEY string + RABBITMQ_URL string + OPENWEATHER_API_KEY string + OPENWEATHER_CACHE_TTL string + WEATHER_FETCH_INTERVAL string ) func Load() { @@ -30,6 +33,9 @@ func Load() { viper.SetDefault("JWT_SECRET_KEY", "jwt_secret_key") viper.SetDefault("GOOGLE_REDIRECT_URL", "http://localhost:8000/auth/login/google") viper.SetDefault("RABBITMQ_URL", "amqp://user:password@localhost:5672/") + viper.SetDefault("OPENWEATHER_API_KEY", "openweather_api_key") + viper.SetDefault("OPENWEATHER_CACHE_TTL", "15m") + viper.SetDefault("WEATHER_FETCH_INTERVAL", "15m") viper.SetConfigFile(".env") viper.AddConfigPath("../../.") @@ -50,4 +56,7 @@ func Load() { GOOGLE_REDIRECT_URL = viper.GetString("GOOGLE_REDIRECT_URL") JWT_SECRET_KEY = viper.GetString("JWT_SECRET_KEY") RABBITMQ_URL = viper.GetString("RABBITMQ_URL") + OPENWEATHER_API_KEY = viper.GetString("OPENWEATHER_API_KEY") + OPENWEATHER_CACHE_TTL = viper.GetString("OPENWEATHER_CACHE_TTL") + WEATHER_FETCH_INTERVAL = viper.GetString("WEATHER_FETCH_INTERVAL") } diff --git a/backend/internal/services/weather/openweathermap_fetcher.go b/backend/internal/services/weather/openweathermap_fetcher.go index 9aaba48..6b5b676 100644 --- a/backend/internal/services/weather/openweathermap_fetcher.go +++ b/backend/internal/services/weather/openweathermap_fetcher.go @@ -1,9 +1,11 @@ +// backend/internal/services/weather/openweathermap_fetcher.go package weather import ( "context" "encoding/json" "fmt" + "io" "log/slog" "net/http" "net/url" @@ -12,54 +14,45 @@ import ( "github.com/forfarm/backend/internal/domain" ) -const openWeatherMapAPIURL = "https://api.openweathermap.org/data/2.5/weather" +const openWeatherMapOneCallAPIURL = "https://api.openweathermap.org/data/3.0/onecall" -type openWeatherMapResponse struct { - Coord struct { - Lon float64 `json:"lon"` - Lat float64 `json:"lat"` - } `json:"coord"` - Weather []struct { - ID int `json:"id"` - Main string `json:"main"` - Description string `json:"description"` - Icon string `json:"icon"` - } `json:"weather"` - Base string `json:"base"` - Main struct { - Temp float64 `json:"temp"` // Kelvin by default - FeelsLike float64 `json:"feels_like"` // Kelvin by default - TempMin float64 `json:"temp_min"` // Kelvin by default - TempMax float64 `json:"temp_max"` // Kelvin by default - Pressure int `json:"pressure"` // hPa - Humidity int `json:"humidity"` // % - SeaLevel int `json:"sea_level"` // hPa - GrndLevel int `json:"grnd_level"` // hPa - } `json:"main"` - Visibility int `json:"visibility"` // meters - Wind struct { - Speed float64 `json:"speed"` // meter/sec - Deg int `json:"deg"` // degrees (meteorological) - Gust float64 `json:"gust"` // meter/sec - } `json:"wind"` - Rain struct { - OneH float64 `json:"1h"` // Rain volume for the last 1 hour, mm - } `json:"rain"` - Clouds struct { - All int `json:"all"` // % - } `json:"clouds"` - Dt int64 `json:"dt"` // Time of data calculation, unix, UTC - Sys struct { - Type int `json:"type"` - ID int `json:"id"` - Country string `json:"country"` - Sunrise int64 `json:"sunrise"` // unix, UTC - Sunset int64 `json:"sunset"` // unix, UTC - } `json:"sys"` - Timezone int `json:"timezone"` // Shift in seconds from UTC - ID int `json:"id"` // City ID - Name string `json:"name"` // City name - Cod int `json:"cod"` // Internal parameter +type openWeatherMapOneCallResponse struct { + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + Timezone string `json:"timezone"` + TimezoneOffset int `json:"timezone_offset"` + Current *struct { + Dt int64 `json:"dt"` // Current time, Unix, UTC + Sunrise int64 `json:"sunrise"` + Sunset int64 `json:"sunset"` + Temp float64 `json:"temp"` // Kelvin by default, 'units=metric' for Celsius + FeelsLike float64 `json:"feels_like"` // Kelvin by default + Pressure int `json:"pressure"` // hPa + Humidity int `json:"humidity"` // % + DewPoint float64 `json:"dew_point"` + Uvi float64 `json:"uvi"` + Clouds int `json:"clouds"` // % + Visibility int `json:"visibility"` // meters + WindSpeed float64 `json:"wind_speed"` // meter/sec by default + WindDeg int `json:"wind_deg"` + WindGust float64 `json:"wind_gust,omitempty"` + Rain *struct { + OneH float64 `json:"1h"` // Rain volume for the last 1 hour, mm + } `json:"rain,omitempty"` + Snow *struct { + OneH float64 `json:"1h"` // Snow volume for the last 1 hour, mm + } `json:"snow,omitempty"` + Weather []struct { + ID int `json:"id"` + Main string `json:"main"` + Description string `json:"description"` + Icon string `json:"icon"` + } `json:"weather"` + } `json:"current,omitempty"` + // Minutely []... + // Hourly []... + // Daily []... + // Alerts []... } type OpenWeatherMapFetcher struct { @@ -87,9 +80,11 @@ func (f *OpenWeatherMapFetcher) GetCurrentWeatherByCoords(ctx context.Context, l queryParams.Set("lat", fmt.Sprintf("%.4f", lat)) queryParams.Set("lon", fmt.Sprintf("%.4f", lon)) queryParams.Set("appid", f.apiKey) - queryParams.Set("units", "metric") + queryParams.Set("units", "metric") // Request Celsius and m/s + queryParams.Set("exclude", "minutely,hourly,daily,alerts") // Exclude parts we don't need now - fullURL := fmt.Sprintf("%s?%s", openWeatherMapAPIURL, queryParams.Encode()) + fullURL := fmt.Sprintf("%s?%s", openWeatherMapOneCallAPIURL, queryParams.Encode()) + f.logger.Debug("Fetching weather from OpenWeatherMap OneCall API", "url", fullURL) req, err := http.NewRequestWithContext(ctx, http.MethodGet, fullURL, nil) if err != nil { @@ -105,36 +100,55 @@ func (f *OpenWeatherMapFetcher) GetCurrentWeatherByCoords(ctx context.Context, l defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - f.logger.Error("OpenWeatherMap API returned non-OK status", "url", fullURL, "status_code", resp.StatusCode) + // TODO: Read resp.Body to get error message from OpenWeatherMap + bodyBytes, _ := io.ReadAll(resp.Body) + f.logger.Error("OpenWeatherMap API returned non-OK status", + "url", fullURL, + "status_code", resp.StatusCode, + "body", string(bodyBytes)) return nil, fmt.Errorf("weather API request failed with status: %s", resp.Status) } - var owmResp openWeatherMapResponse + var owmResp openWeatherMapOneCallResponse if err := json.NewDecoder(resp.Body).Decode(&owmResp); err != nil { - f.logger.Error("Failed to decode OpenWeatherMap response", "error", err) + f.logger.Error("Failed to decode OpenWeatherMap OneCall response", "error", err) return nil, fmt.Errorf("failed to decode weather response: %w", err) } - if len(owmResp.Weather) == 0 { - f.logger.Warn("OpenWeatherMap response missing weather details", "lat", lat, "lon", lon) + if owmResp.Current == nil { + f.logger.Warn("OpenWeatherMap OneCall response missing 'current' weather data", "lat", lat, "lon", lon) + return nil, fmt.Errorf("current weather data not found in API response") + } + current := owmResp.Current + + if len(current.Weather) == 0 { + f.logger.Warn("OpenWeatherMap response missing weather description details", "lat", lat, "lon", lon) return nil, fmt.Errorf("weather data description not found in response") } - weatherData := &domain.WeatherData{ - Timestamp: time.Unix(owmResp.Dt, 0).UTC(), - TempCelsius: owmResp.Main.Temp, - Humidity: float64(owmResp.Main.Humidity), - Description: owmResp.Weather[0].Description, - Icon: owmResp.Weather[0].Icon, - WindSpeed: owmResp.Wind.Speed, - RainVolume1h: owmResp.Rain.OneH, + // Create domain object using pointers for optional fields + weatherData := &domain.WeatherData{} // Initialize empty struct first + + // Assign values using pointers, checking for nil where appropriate + weatherData.TempCelsius = ¤t.Temp + humidityFloat := float64(current.Humidity) + weatherData.Humidity = &humidityFloat + weatherData.Description = ¤t.Weather[0].Description + weatherData.Icon = ¤t.Weather[0].Icon + weatherData.WindSpeed = ¤t.WindSpeed + if current.Rain != nil { + weatherData.RainVolume1h = ¤t.Rain.OneH } + observedTime := time.Unix(current.Dt, 0).UTC() + weatherData.ObservedAt = &observedTime + now := time.Now().UTC() + weatherData.WeatherLastUpdated = &now f.logger.Debug("Successfully fetched weather data", "lat", lat, "lon", lon, - "temp", weatherData.TempCelsius, - "description", weatherData.Description) + "temp", *weatherData.TempCelsius, + "description", *weatherData.Description) return weatherData, nil } diff --git a/backend/internal/workers/weather_updater.go b/backend/internal/workers/weather_updater.go new file mode 100644 index 0000000..1b48880 --- /dev/null +++ b/backend/internal/workers/weather_updater.go @@ -0,0 +1,170 @@ +// backend/internal/workers/weather_updater.go +package workers + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/forfarm/backend/internal/domain" + "github.com/google/uuid" +) + +type WeatherUpdater struct { + farmRepo domain.FarmRepository + weatherFetcher domain.WeatherFetcher + eventPublisher domain.EventPublisher + logger *slog.Logger + fetchInterval time.Duration + stopChan chan struct{} // Channel to signal stopping + wg sync.WaitGroup +} + +func NewWeatherUpdater( + farmRepo domain.FarmRepository, + weatherFetcher domain.WeatherFetcher, + eventPublisher domain.EventPublisher, + logger *slog.Logger, + fetchInterval time.Duration, +) *WeatherUpdater { + if logger == nil { + logger = slog.Default() + } + if fetchInterval <= 0 { + fetchInterval = 15 * time.Minute + } + return &WeatherUpdater{ + farmRepo: farmRepo, + weatherFetcher: weatherFetcher, + eventPublisher: eventPublisher, + logger: logger, + fetchInterval: fetchInterval, + stopChan: make(chan struct{}), + } +} + +func (w *WeatherUpdater) Start(ctx context.Context) { + w.logger.Info("Starting Weather Updater worker", "interval", w.fetchInterval) + ticker := time.NewTicker(w.fetchInterval) + + w.wg.Add(1) + + go func() { + defer w.wg.Done() + defer ticker.Stop() + w.logger.Info("Weather Updater goroutine started") + + w.fetchAndUpdateAllFarms(ctx) + + for { + select { + case <-ticker.C: + w.logger.Info("Weather Updater tick: fetching weather data") + w.fetchAndUpdateAllFarms(ctx) + case <-w.stopChan: + w.logger.Info("Weather Updater received stop signal, stopping...") + return + case <-ctx.Done(): + w.logger.Info("Weather Updater context cancelled, stopping...", "reason", ctx.Err()) + return + } + } + }() +} + +func (w *WeatherUpdater) Stop() { + w.logger.Info("Attempting to stop Weather Updater worker...") + close(w.stopChan) + w.wg.Wait() + w.logger.Info("Weather Updater worker stopped") +} + +func (w *WeatherUpdater) fetchAndUpdateAllFarms(ctx context.Context) { + // Use a background context for the repository call if the main context might cancel prematurely + // repoCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // Example timeout + // defer cancel() + + // TODO: Need a GetAllFarms method in the FarmRepository or a way to efficiently get all farm locations. + farms, err := w.farmRepo.GetByOwnerID(ctx, "") // !! REPLACE with a proper GetAll method !! + if err != nil { + w.logger.Error("Failed to get farms for weather update", "error", err) + return + } + if len(farms) == 0 { + w.logger.Info("No farms found to update weather for.") + return + } + + w.logger.Info("Found farms for weather update", "count", len(farms)) + + var fetchWg sync.WaitGroup + fetchCtx, cancelFetches := context.WithCancel(ctx) + defer cancelFetches() + + for _, farm := range farms { + if farm.Lat == 0 && farm.Lon == 0 { + w.logger.Warn("Skipping farm with zero coordinates", "farm_id", farm.UUID, "farm_name", farm.Name) + continue + } + + fetchWg.Add(1) + go func(f domain.Farm) { + defer fetchWg.Done() + select { + case <-fetchCtx.Done(): + return + default: + w.fetchAndPublishWeather(fetchCtx, f) + } + }(farm) + } + + fetchWg.Wait() + w.logger.Info("Finished weather fetch cycle for farms", "count", len(farms)) +} + +func (w *WeatherUpdater) fetchAndPublishWeather(ctx context.Context, farm domain.Farm) { + weatherData, err := w.weatherFetcher.GetCurrentWeatherByCoords(ctx, farm.Lat, farm.Lon) + if err != nil { + w.logger.Error("Failed to fetch weather data", "farm_id", farm.UUID, "lat", farm.Lat, "lon", farm.Lon, "error", err) + return + } + if weatherData == nil { + w.logger.Warn("Received nil weather data without error", "farm_id", farm.UUID) + return + } + + payloadMap := map[string]interface{}{ + "farm_id": farm.UUID, + "lat": farm.Lat, + "lon": farm.Lon, + "temp_celsius": weatherData.TempCelsius, + "humidity": weatherData.Humidity, + "description": weatherData.Description, + "icon": weatherData.Icon, + "wind_speed": weatherData.WindSpeed, + "rain_volume_1h": weatherData.RainVolume1h, + "observed_at": weatherData.ObservedAt, + "weather_last_updated": weatherData.WeatherLastUpdated, + } + + event := domain.Event{ + ID: uuid.NewString(), + Type: "weather.updated", + Source: "weather-updater-worker", + Timestamp: time.Now().UTC(), + AggregateID: farm.UUID, + Payload: payloadMap, + } + + pubCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = w.eventPublisher.Publish(pubCtx, event) + if err != nil { + w.logger.Error("Failed to publish weather.updated event", "farm_id", farm.UUID, "event_id", event.ID, "error", err) + } else { + w.logger.Debug("Published weather.updated event", "farm_id", farm.UUID, "event_id", event.ID) + } +}